[jira] [Assigned] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)

2024-10-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15908:
-

Assignee: (was: Kirk True)

> Remove deprecated Consumer API poll(long timeout)
> -
>
> Key: KAFKA-15908
> URL: https://issues.apache.org/jira/browse/KAFKA-15908
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, timeout
> Fix For: 4.0.0
>
>
> Per 
> [KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior],
>  the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. 
> In 3.7, there are two implementations, each with different behavior:
> * The {{LegacyKafkaConsumer}} implementation will continue to work but will 
> log a warning about its removal
> * The {{AsyncKafkaConsumer}} implementation will throw an error.
> In 4.0, the `poll` method that takes a single `long` timeout will be removed 
> altogether.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)

2024-10-03 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17886780#comment-17886780
 ] 

Kirk True commented on KAFKA-15908:
---

I've unassigned myself, so whomever is willing and available can take it over. 
Thanks :)

> Remove deprecated Consumer API poll(long timeout)
> -
>
> Key: KAFKA-15908
> URL: https://issues.apache.org/jira/browse/KAFKA-15908
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, timeout
> Fix For: 4.0.0
>
>
> Per 
> [KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior],
>  the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. 
> In 3.7, there are two implementations, each with different behavior:
> * The {{LegacyKafkaConsumer}} implementation will continue to work but will 
> log a warning about its removal
> * The {{AsyncKafkaConsumer}} implementation will throw an error.
> In 4.0, the `poll` method that takes a single `long` timeout will be removed 
> altogether.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17686) AsyncKafkaConsumer.offsetsForTimes() fails with NullPointerException

2024-10-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17686:
--
Summary: AsyncKafkaConsumer.offsetsForTimes() fails with 
NullPointerException  (was: AsyncKafkaConsumer.offsetsForTimes fails with 
NullPointerException)

> AsyncKafkaConsumer.offsetsForTimes() fails with NullPointerException
> 
>
> Key: KAFKA-17686
> URL: https://issues.apache.org/jira/browse/KAFKA-17686
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests
> Fix For: 4.0.0
>
>
> Error when running the integration test:
> {noformat}
> Gradle Test Run :core:integrationTest > Gradle Test Executor 10 > 
> PlaintextAdminIntegrationTest > testOffsetsForTimesAfterDeleteRecords(String) 
> > "testOffsetsForTimesAfterDeleteRecords(String).quorum=kraft" FAILED
> java.lang.NullPointerException: Cannot invoke 
> "org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal.buildOffsetAndTimestamp()"
>  because the return value of "java.util.Map$Entry.getValue()" is null
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.lambda$offsetsForTimes$4(AsyncKafkaConsumer.java:1082)
> at 
> java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
> at 
> java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at 
> java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858)
> at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  
> at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1080)
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1043)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes(KafkaConsumer.java:1560)
> at 
> kafka.api.PlaintextAdminIntegrationTest.testOffsetsForTimesAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:1535)
>  
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17686) AsyncKafkaConsumer.offsetsForTimes fails with NullPointerException

2024-10-02 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17686:
--
Summary: AsyncKafkaConsumer.offsetsForTimes fails with NullPointerException 
 (was: PlaintextAdminIntegrationTest’s testOffsetsForTimesAfterDeleteRecords 
fails with NPE with new consumer)

> AsyncKafkaConsumer.offsetsForTimes fails with NullPointerException
> --
>
> Key: KAFKA-17686
> URL: https://issues.apache.org/jira/browse/KAFKA-17686
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests
> Fix For: 4.0.0
>
>
> Error when running the integration test:
> {noformat}
> Gradle Test Run :core:integrationTest > Gradle Test Executor 10 > 
> PlaintextAdminIntegrationTest > testOffsetsForTimesAfterDeleteRecords(String) 
> > "testOffsetsForTimesAfterDeleteRecords(String).quorum=kraft" FAILED
> java.lang.NullPointerException: Cannot invoke 
> "org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal.buildOffsetAndTimestamp()"
>  because the return value of "java.util.Map$Entry.getValue()" is null
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.lambda$offsetsForTimes$4(AsyncKafkaConsumer.java:1082)
> at 
> java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
> at 
> java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at 
> java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858)
> at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  
> at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1080)
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1043)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes(KafkaConsumer.java:1560)
> at 
> kafka.api.PlaintextAdminIntegrationTest.testOffsetsForTimesAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:1535)
>  
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17686) PlaintextAdminIntegrationTest’s testOffsetsForTimesAfterDeleteRecords fails with NPE with new consumer

2024-10-02 Thread Kirk True (Jira)
Kirk True created KAFKA-17686:
-

 Summary: PlaintextAdminIntegrationTest’s 
testOffsetsForTimesAfterDeleteRecords fails with NPE with new consumer
 Key: KAFKA-17686
 URL: https://issues.apache.org/jira/browse/KAFKA-17686
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Affects Versions: 3.9.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 4.0.0


Error when running the integration test:

{noformat}
Gradle Test Run :core:integrationTest > Gradle Test Executor 10 > 
PlaintextAdminIntegrationTest > testOffsetsForTimesAfterDeleteRecords(String) > 
"testOffsetsForTimesAfterDeleteRecords(String).quorum=kraft" FAILED
java.lang.NullPointerException: Cannot invoke 
"org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal.buildOffsetAndTimestamp()"
 because the return value of "java.util.Map$Entry.getValue()" is null
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.lambda$offsetsForTimes$4(AsyncKafkaConsumer.java:1082)
at 
java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
at 
java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at 
java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
 
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1080)
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.offsetsForTimes(AsyncKafkaConsumer.java:1043)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes(KafkaConsumer.java:1560)
at 
kafka.api.PlaintextAdminIntegrationTest.testOffsetsForTimesAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:1535)
 
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16109) Write system tests cover the "simple consumer + commit" use case

2024-09-30 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885936#comment-17885936
 ] 

Kirk True commented on KAFKA-16109:
---

I think we can push it to post-4.0 and/or lower the priority. Thanks!

> Write system tests cover the "simple consumer + commit" use case
> 
>
> Key: KAFKA-16109
> URL: https://issues.apache.org/jira/browse/KAFKA-16109
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor, system-tests
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17647) Check PlaintextConsumerSubscriptionTest#testSubscribeInvalidTopicCanUnsubscribe really ignore ErrorEvent

2024-09-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17647:
--
Component/s: clients
 consumer

> Check 
> PlaintextConsumerSubscriptionTest#testSubscribeInvalidTopicCanUnsubscribe 
> really ignore ErrorEvent
> 
>
> Key: KAFKA-17647
> URL: https://issues.apache.org/jira/browse/KAFKA-17647
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> Follow up for [https://github.com/apache/kafka/pull/17244].
> We only check unsubscribe can succeed, but this call may run too fast, so 
> there may not have further ErrorEvent. We may check new metric 
> background-event-queue-size to make sure there is new ErrorEvent and 
> unsubscribe can ignore it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17648) AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizationException

2024-09-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17648:
--
Component/s: clients
 consumer

> AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizationException
> --
>
> Key: KAFKA-17648
> URL: https://issues.apache.org/jira/browse/KAFKA-17648
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> Followup for [https://github.com/apache/kafka/pull/17244].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback

2024-09-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17623:
--
Component/s: clients

> Flaky 
> testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
> 
>
> Key: KAFKA-17623
> URL: https://issues.apache.org/jira/browse/KAFKA-17623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Flaky for the new consumer, failing with :
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error at 
> app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618)
> ...
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition topic-0 at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395)
>  at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425)
>  at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171)
>  
> Flaky behaviour:
>  
> https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout

2024-09-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17618:
--
Component/s: (was: clients)
 (was: consumer)

> group consumer heartbeat interval should be less than session timeout
> -
>
> Key: KAFKA-17618
> URL: https://issues.apache.org/jira/browse/KAFKA-17618
> Project: Kafka
>  Issue Type: Task
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: kip-848
>
> [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session]
>  mentions:
> bq. The member is expected to heartbeat every 
> group.consumer.heartbeat.interval.ms in order to keep its session opened. If 
> it does not heartbeat at least once within the 
> group.consumer.session.timeout.ms, the group coordinator will kick the member 
> out from the group.
> To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than 
> _group.consumer.session.timeout.ms_, we can add validation for it.
> We can do similar validation for _group.share.heartbeat.interval.ms_ and 
> _group.share.session.timeout.ms_ as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout

2024-09-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17618:
--
Component/s: clients
 consumer

> group consumer heartbeat interval should be less than session timeout
> -
>
> Key: KAFKA-17618
> URL: https://issues.apache.org/jira/browse/KAFKA-17618
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: kip-848
>
> [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session]
>  mentions:
> bq. The member is expected to heartbeat every 
> group.consumer.heartbeat.interval.ms in order to keep its session opened. If 
> it does not heartbeat at least once within the 
> group.consumer.session.timeout.ms, the group coordinator will kick the member 
> out from the group.
> To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than 
> _group.consumer.session.timeout.ms_, we can add validation for it.
> We can do similar validation for _group.share.heartbeat.interval.ms_ and 
> _group.share.session.timeout.ms_ as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout

2024-09-25 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884834#comment-17884834
 ] 

Kirk True commented on KAFKA-17518:
---

Thanks! :)

> AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
> -
>
> Key: KAFKA-17518
> URL: https://issues.apache.org/jira/browse/KAFKA-17518
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: TengYao Chi
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot 
> complete, leading to the consumer remaining in the consumer group.
> On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the 
> consumer group. This process requires hops back and forth between the 
> application and background threads to call the 
> {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to 
> the close step.
> The events used to communicate between the application and background threads 
> are based on the timeout provided by the user. If the timeout is not 
> sufficient, the events will expire, and the process will be left incomplete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout

2024-09-25 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884832#comment-17884832
 ] 

Kirk True commented on KAFKA-17518:
---

[~frankvicky]—as [~lianetm] mentioned, this Jira came out of the PR discussion 
from [the pull request for 
KAFKA-16985|https://github.com/apache/kafka/pull/16686]. My personal preference 
would be to encourage you to join the discourse in that PR so that we arrive at 
consistent approaches for these various edge cases.

> AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
> -
>
> Key: KAFKA-17518
> URL: https://issues.apache.org/jira/browse/KAFKA-17518
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: TengYao Chi
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot 
> complete, leading to the consumer remaining in the consumer group.
> On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the 
> consumer group. This process requires hops back and forth between the 
> application and background threads to call the 
> {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to 
> the close step.
> The events used to communicate between the application and background threads 
> are based on the timeout provided by the user. If the timeout is not 
> sufficient, the events will expire, and the process will be left incomplete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16778) AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition

2024-09-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16778:
--
Labels: kip-848-client-support  (was: )

> AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked 
> partition
> -
>
> Key: KAFKA-16778
> URL: https://issues.apache.org/jira/browse/KAFKA-16778
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
>  
> {code:java}
> java.lang.IllegalStateException: No current assignment for partition 
> output-topic-26
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.position(SubscriptionState.java:542)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:411)
>     at 
> org.apache.kafka.clients.consumer.internals.FetchRequestManager.poll(FetchRequestManager.java:74)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$new$2(ConsumerNetworkThread.java:159)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:143)
>     at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>     at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>     at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>     at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
>     at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>     at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>     at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>     at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at 
> java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:145)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:94)
>  {code}
> The setup is - running 30 consumers consuming from a 300 partitions topic.
> We can occasionally get an IllegalStateException from the consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16778) AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition

2024-09-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16778:
--
Component/s: clients
 consumer

> AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked 
> partition
> -
>
> Key: KAFKA-16778
> URL: https://issues.apache.org/jira/browse/KAFKA-16778
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>
>  
> {code:java}
> java.lang.IllegalStateException: No current assignment for partition 
> output-topic-26
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.position(SubscriptionState.java:542)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:411)
>     at 
> org.apache.kafka.clients.consumer.internals.FetchRequestManager.poll(FetchRequestManager.java:74)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$new$2(ConsumerNetworkThread.java:159)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:143)
>     at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>     at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>     at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>     at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
>     at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>     at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>     at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>     at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at 
> java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:145)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:94)
>  {code}
> The setup is - running 30 consumers consuming from a 300 partitions topic.
> We can occasionally get an IllegalStateException from the consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16966) Allow offset commit fetch to reuse previous request if partitions are a subset

2024-09-25 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884828#comment-17884828
 ] 

Kirk True commented on KAFKA-16966:
---

[~taijuwu]—feel free to take it :)

> Allow offset commit fetch to reuse previous request if partitions are a subset
> --
>
> Key: KAFKA-16966
> URL: https://issues.apache.org/jira/browse/KAFKA-16966
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> In {{{}initWithCommittedOffsetsIfNeeded{}}}, the behavior of the existing 
> {{LegacyKafkaConsumer}} is to allow reuse only if the partitions for the 
> _current_ request equal those of the _previous_ request *exactly* 
> ([source|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java?rgh-link-date=2024-06-14T16%3A43%3A11Z#L927]).
>  That logic is the basis for the behavior used in the 
> {{{}AsyncKafkaConsumer{}}}.
> The proposed change is to allow for request reuse if the partitions for the 
> _current_ request are a subset of those of the _previous_ request. This 
> introduces a subtle difference in behavior between the two {{Consumer}} 
> implementations, so we need to decided if we want to change both 
> implementations or just {{{}AsyncKafkaConsumer{}}}. Also, the specific case 
> that the request reuse logic solves is when the user has passed in a very low 
> timeout value in a tight {{poll()}} loop, which suggests the partitions 
> wouldn't be changing between those loops.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced/failed

2024-09-25 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884827#comment-17884827
 ] 

Kirk True commented on KAFKA-15588:
---

[~frankvicky]—I can't say with any certainty, but I would think most pending 
work pre-fencing should be cleaned up. Feel free to take it. 

> Purge the unsent offset commits/fetches when the member is fenced/failed
> 
>
> Key: KAFKA-15588
> URL: https://issues.apache.org/jira/browse/KAFKA-15588
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support, reconciliation
> Fix For: 4.0.0
>
>
> When the member is fenced/failed, we should purge the inflight offset commits 
> and fetches.  HeartbeatRequestManager should be able to handle this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17581) AsyncKafkaConsumer can't unsubscribe invalid topics

2024-09-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17581:
--
Labels: consumer-threading-refactor kip-848-client-support  (was: )

> AsyncKafkaConsumer can't unsubscribe invalid topics
> ---
>
> Key: KAFKA-17581
> URL: https://issues.apache.org/jira/browse/KAFKA-17581
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> When consumer subscribes an invalid topic name like " this is test", classic 
> consumer can unsubscribe without error. However, async consumer can't. We can 
> use following integration test to validate:
>  
> {code:java}
> @ParameterizedTest(name = 
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
> @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
> def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = {
>   // Invalid topic name due to space
>   val invalidTopicName = "topic abc"
>   val consumer = createConsumer()
>   consumer.subscribe(List(invalidTopicName).asJava)
>   var exception : InvalidTopicException = null
>   TestUtils.waitUntilTrue(() => {
> try consumer.poll(Duration.ofMillis(500)) catch {
>   case e : InvalidTopicException => exception = e
>   case e : Throwable => fail(s"An InvalidTopicException should be thrown. 
> But ${e.getClass} is thrown")
> }
> exception != null
>   }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")
>   assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage)
>   // AsyncKafkaConsumer sends request in background thread. Wait enough time 
> to send next request.
>   Thread.sleep(1000)
>   assertDoesNotThrow(new Executable {
> override def execute(): Unit = consumer.unsubscribe()
>   })
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17582) Unpredictable consumer position after transaction abort

2024-09-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17582:
--
Component/s: consumer

> Unpredictable consumer position after transaction abort
> ---
>
> Key: KAFKA-17582
> URL: https://issues.apache.org/jira/browse/KAFKA-17582
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, documentation
>Affects Versions: 3.8.0
>Reporter: Kyle Kingsbury
>Priority: Critical
>  Labels: abort, offset, transaction
> Attachments: 20240919T124411.740-0500(1).zip, Screenshot from 
> 2024-09-19 18-45-34.png
>
>
> With the official Kafka Java client, version 3.8.0, the position of consumers 
> after a transaction aborts appears unpredictable. Sometimes the consumer 
> moves on, skipping over the records it polled in the aborted transaction. 
> Sometimes it rewinds to read them again. Sometimes it rewinds *further* than 
> the most recent transaction.
> Since the goal of transactions is to enable "exactly-once semantics", it 
> seems sensible that the consumer should rewind on abort, such that any 
> subsequent transactions would start at the same offsets. Not rewinding leads 
> to data loss, since messages are consumed but their effects are not 
> committed. Rewinding too far is... just weird.
> I'm seeing this issue in Jepsen tests of Kafka 3.0.0 and other 
> Kafka-compatible systems.  It occurs without faults, and with a single 
> producer and consumer; no other concurrent processes. Here's the producer and 
> consumer config:
>  
> {{{}Producer config: {"socket.connection.setup.timeout.max.ms" 1000, 
> "transactional.id" "jt1", "bootstrap.servers" "n3:9092", "request.timeout.ms" 
> 3000, "enable.idempotence" true, "max.block.ms" 1, "value.serializer" 
> "org.apache.kafka.common.serialization.LongSerializer", "retries" 1000, 
> "key.serializer" "org.apache.kafka.common.serialization.LongSerializer", 
> "socket.connection.setup.timeout.ms" 500, "reconnect.backoff.max.ms" 1000, 
> "delivery.timeout.ms" 1, "acks" "all", "transaction.timeout.ms" 1000{
> {{{}Consumer config: {"socket.connection.setup.timeout.max.ms" 1000, 
> "bootstrap.servers" "n5:9092", "request.timeout.ms" 1, 
> "connections.max.idle.ms" 6, "session.timeout.ms" 6000, 
> "heartbeat.interval.ms" 300, "key.deserializer" 
> "org.apache.kafka.common.serialization.LongDeserializer", "group.id" 
> "jepsen-group", "metadata.max.age.ms" 6, "auto.offset.reset" "earliest", 
> "isolation.level" "read_committed", "socket.connection.setup.timeout.ms" 500, 
> "value.deserializer" 
> "org.apache.kafka.common.serialization.LongDeserializer", 
> "enable.auto.commit" false, "default.api.timeout.ms" 1{
>  
> Attached is a test run that shows this behavior, as well as a visualization 
> of the reads (polls) and writes (sends) of a single topic-partition.
> In this plot, time flows down, and offsets run left to right. Each 
> transaction is a single horizontal line. `w1` denotes a send of value 1, and 
> `r2` denotes a poll of read 2. All operations here are performed by the sole 
> process in the system, which has a single Kafka consumer and a single Kafka 
> client. First,  a transaction writes 35 and commits. Second, a transaction 
> reads 35 and aborts. Third, a transaction reads 35 and aborts: the consumer 
> has clearly re-wound to show the same record twice.
> Then a transaction writes 37. Immediately thereafter a transaction reads 37 
> and 38. Unlike before, it did *not* rewind. This transaction also aborts.
> Finally, a transaction writes 39 and 40. Then a transaction reads 39 and 40. 
> This transaction commits! Values 35, 37, and 38 have been lost!
> It doesn't seem possible that this is the effect of a consumer rebalance: 
> rebalancing should start off the consumer at the last *committed* offset, and 
> the last committed offset in this history was actually value 31–it should 
> have picked up at 35, 37, etc. This test uses auto.offset.reset=earliest, so 
> if the commit were somehow missing, it should have rewound to the start of 
> the topic-partition.
> What... *should* Kafka do with respect to consumer offsets when a transaction 
> aborts? And is there any sort of documentation for this? I've been digging 
> into this problem for almost a week–it manifested as write loss in a Jepsen 
> test--and I'm baffled as to how to proceed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17439) Make polling for new records an explicit action/event in the new consumer

2024-09-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17439:
--
Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor)

> Make polling for new records an explicit action/event in the new consumer
> -
>
> Key: KAFKA-17439
> URL: https://issues.apache.org/jira/browse/KAFKA-17439
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> Presently, the new consumer polls the FetchRequestManager many, many times a 
> second and creates fetch requests for any fetchable partitions. In order to 
> more closely mirror how the existing consumer processes fetches, we should 
> mirror the points at which fetch requests are sent in the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16460) New consumer times out consuming records in multiple consumer_test.py system tests

2024-09-24 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16460:
-

Assignee: Kirk True  (was: Arpit Goyal)

> New consumer times out consuming records in multiple consumer_test.py system 
> tests
> --
>
> Key: KAFKA-16460
> URL: https://issues.apache.org/jira/browse/KAFKA-16460
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> The {{consumer_test.py}} system test fails with the following errors:
> {quote}
> * Timed out waiting for consumption
> {quote}
> Affected tests:
> * {{test_broker_failure}}
> * {{test_consumer_bounce}}
> * {{test_static_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16460) New consumer times out consuming records in multiple consumer_test.py system tests

2024-09-24 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884399#comment-17884399
 ] 

Kirk True commented on KAFKA-16460:
---

There's a good chance this is a result of a different problem, so I don't want 
you to chase issues that are unrelated. If I find an actionable bug to fix, 
I'll assign it to you.

> New consumer times out consuming records in multiple consumer_test.py system 
> tests
> --
>
> Key: KAFKA-16460
> URL: https://issues.apache.org/jira/browse/KAFKA-16460
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Arpit Goyal
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> The {{consumer_test.py}} system test fails with the following errors:
> {quote}
> * Timed out waiting for consumption
> {quote}
> Affected tests:
> * {{test_broker_failure}}
> * {{test_consumer_bounce}}
> * {{test_static_consumer_bounce}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17581) AsyncKafkaConsumer can't unsubscribe invalid topics

2024-09-23 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17581:
--
Component/s: clients
 consumer

> AsyncKafkaConsumer can't unsubscribe invalid topics
> ---
>
> Key: KAFKA-17581
> URL: https://issues.apache.org/jira/browse/KAFKA-17581
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>
> When consumer subscribes an invalid topic name like " this is test", classic 
> consumer can unsubscribe without error. However, async consumer can't. We can 
> use following integration test to validate:
>  
> {code:java}
> @ParameterizedTest(name = 
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
> @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
> def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = {
>   // Invalid topic name due to space
>   val invalidTopicName = "topic abc"
>   val consumer = createConsumer()
>   consumer.subscribe(List(invalidTopicName).asJava)
>   var exception : InvalidTopicException = null
>   TestUtils.waitUntilTrue(() => {
> try consumer.poll(Duration.ofMillis(500)) catch {
>   case e : InvalidTopicException => exception = e
>   case e : Throwable => fail(s"An InvalidTopicException should be thrown. 
> But ${e.getClass} is thrown")
> }
> exception != null
>   }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")
>   assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage)
>   // AsyncKafkaConsumer sends request in background thread. Wait enough time 
> to send next request.
>   Thread.sleep(1000)
>   assertDoesNotThrow(new Executable {
> override def execute(): Unit = consumer.unsubscribe()
>   })
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17581) AsyncKafkaConsumer can't unsubscribe invalid topics

2024-09-22 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17883613#comment-17883613
 ] 

Kirk True commented on KAFKA-17581:
---

[~yangpoan]—this is a good catch! Where in the new consumer is the exception 
thrown? I’d want to look at the existing consumer to see how it handles this 
case.

> AsyncKafkaConsumer can't unsubscribe invalid topics
> ---
>
> Key: KAFKA-17581
> URL: https://issues.apache.org/jira/browse/KAFKA-17581
> Project: Kafka
>  Issue Type: Bug
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>
> When consumer subscribes an invalid topic name like " this is test", classic 
> consumer can unsubscribe without error. However, async consumer can't. We can 
> use following integration test to validate:
>  
> {code:java}
> @ParameterizedTest(name = 
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
> @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
> def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = {
>   // Invalid topic name due to space
>   val invalidTopicName = "topic abc"
>   val consumer = createConsumer()
>   consumer.subscribe(List(invalidTopicName).asJava)
>   var exception : InvalidTopicException = null
>   TestUtils.waitUntilTrue(() => {
> try consumer.poll(Duration.ofMillis(500)) catch {
>   case e : InvalidTopicException => exception = e
>   case e : Throwable => fail(s"An InvalidTopicException should be thrown. 
> But ${e.getClass} is thrown")
> }
> exception != null
>   }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")
>   assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage)
>   // AsyncKafkaConsumer sends request in background thread. Wait enough time 
> to send next request.
>   Thread.sleep(1000)
>   assertDoesNotThrow(new Executable {
> override def execute(): Unit = consumer.unsubscribe()
>   })
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17554) Flaky testFutureCompletionOutsidePoll in ConsumerNetworkClientTest

2024-09-16 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17554:
--
Component/s: clients

> Flaky testFutureCompletionOutsidePoll in ConsumerNetworkClientTest
> --
>
> Key: KAFKA-17554
> URL: https://issues.apache.org/jira/browse/KAFKA-17554
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: 黃竣陽
>Priority: Major
>  Labels: consumer, flaky-test
>
> Fails with:
> org.opentest4j.AssertionFailedError: expected:  but was:  at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:183) at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClientTest.testFutureCompletionOutsidePoll(ConsumerNetworkClientTest.java:295)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> java.util.ArrayList.forEach(ArrayList.java:1259) at 
> java.util.ArrayList.forEach(ArrayList.java:1259)
>  
> Flaky behaviour started recently (~ Aug 27, 2024): 
> [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=1726258207388&search.startTimeMin=172110240&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClientTest&tests.test=testFutureCompletionOutsidePoll()]
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17519) Define and validate correctness of Consumer.close() and its timeout when thread is interrupted

2024-09-12 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881422#comment-17881422
 ] 

Kirk True commented on KAFKA-17519:
---

[~yangpoan]—it's yours if you want it. Thanks!

> Define and validate correctness of Consumer.close() and its timeout when 
> thread is interrupted
> --
>
> Key: KAFKA-17519
> URL: https://issues.apache.org/jira/browse/KAFKA-17519
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> The repercussions of a thread's interrupt status on {{Consumer.close()}} and 
> its timeout is not well defined. It _appears_ that the 
> {{ClassicKafkaConsumer}} will continue to attempt to close all its resources 
> even if an interrupt was triggered prior to—or during—the call to {{close()}} 
> though it effectively ignores the user's supplied timeout since each call to 
> {{NetworkClient.poll()}} will throw an {{InterruptException}} after first 
> making an attempt to poll the socket.
> The task here is to review the existing code, verify the behavior with some 
> unit/integration tests, and document it. Furthermore, once the intended 
> behavior has been confirmed, the {{AsyncKafkaConsumer}} should be updated to 
> behave likewise.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17535) Flaky testCloseNoWait in KafkaConsumerTest for Classic Consumer

2024-09-12 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17535:
--
Component/s: clients

> Flaky testCloseNoWait in KafkaConsumerTest for Classic Consumer 
> 
>
> Key: KAFKA-17535
> URL: https://issues.apache.org/jira/browse/KAFKA-17535
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: flaky-test
>
> Flaky failures due to TimeoutException:
> java.util.concurrent.TimeoutException at 
> java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204) at 
> org.apache.kafka.clients.consumer.KafkaConsumerTest.consumerCloseTest(KafkaConsumerTest.java:2049)
> Flaky in trunk for the classic consumer for a while:
> [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=17261&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testCloseNoWait(GroupProtocol)%5B1%5D]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout

2024-09-12 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881421#comment-17881421
 ] 

Kirk True commented on KAFKA-17518:
---

[~frankvicky]—yes, you're free to take it. Thanks!

> AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
> -
>
> Key: KAFKA-17518
> URL: https://issues.apache.org/jira/browse/KAFKA-17518
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot 
> complete, leading to the consumer remaining in the consumer group.
> On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the 
> consumer group. This process requires hops back and forth between the 
> application and background threads to call the 
> {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to 
> the close step.
> The events used to communicate between the application and background threads 
> are based on the timeout provided by the user. If the timeout is not 
> sufficient, the events will expire, and the process will be left incomplete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17536) Ensure clear error message when "new" consumer used with incompatible cluster

2024-09-12 Thread Kirk True (Jira)
Kirk True created KAFKA-17536:
-

 Summary: Ensure clear error message when "new" consumer used with 
incompatible cluster
 Key: KAFKA-17536
 URL: https://issues.apache.org/jira/browse/KAFKA-17536
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Affects Versions: 3.9.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 4.0.0


In Kafka 4.0, by default the consumer uses the updated consumer group protocol 
defined in KIP-848. When the consumer is used with a cluster that does not 
support (or is not configured to use) the new protocol, the consumer will get 
an unfriendly error about unavailable APIs. Since this error could be the 
user's first impression when attempting to upgrade to 4.0, we need to make sure 
that the error is very clear about the remediation steps (set the 
group.protocol to CLASSIC on the client or upgrade and enable the new protocol 
on the cluster).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17536) Ensure clear error message when "new" consumer used with incompatible cluster

2024-09-12 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17536:
--
Priority: Critical  (was: Major)

> Ensure clear error message when "new" consumer used with incompatible cluster
> -
>
> Key: KAFKA-17536
> URL: https://issues.apache.org/jira/browse/KAFKA-17536
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> In Kafka 4.0, by default the consumer uses the updated consumer group 
> protocol defined in KIP-848. When the consumer is used with a cluster that 
> does not support (or is not configured to use) the new protocol, the consumer 
> will get an unfriendly error about unavailable APIs. Since this error could 
> be the user's first impression when attempting to upgrade to 4.0, we need to 
> make sure that the error is very clear about the remediation steps (set the 
> group.protocol to CLASSIC on the client or upgrade and enable the new 
> protocol on the cluster).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17519) Define behavior of Consumer.close() and its timeout when thread is interrupted

2024-09-10 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17519:
--
Summary: Define behavior of Consumer.close() and its timeout when thread is 
interrupted  (was: Review intended behavior of Consumer.close() and its timeout 
on interrupt)

> Define behavior of Consumer.close() and its timeout when thread is interrupted
> --
>
> Key: KAFKA-17519
> URL: https://issues.apache.org/jira/browse/KAFKA-17519
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> The repercussions of a thread's interrupt status on {{Consumer.close()}} and 
> its timeout is not well defined. It _appears_ that the 
> {{ClassicKafkaConsumer}} will continue to attempt to close all its resources 
> even if an interrupt was triggered prior to—or during—the call to {{close()}} 
> though it effectively ignores the user's supplied timeout since each call to 
> {{NetworkClient.poll()}} will throw an {{InterruptException}} after first 
> making an attempt to poll the socket.
> The task here is to review the existing code, verify the behavior with some 
> unit/integration tests, and document it. Furthermore, once the intended 
> behavior has been confirmed, the {{AsyncKafkaConsumer}} should be updated to 
> behave likewise.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout

2024-09-10 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17880817#comment-17880817
 ] 

Kirk True edited comment on KAFKA-17518 at 9/11/24 12:32 AM:
-

This issue was found during [review|https://github.com/apache/kafka/pull/16686] 
and testing for KAFKA-16985.


was (Author: kirktrue):
This issue was found during 
[review|[https://github.com/apache/kafka/pull/16686]] and testing for 
KAFKA-16985.

> AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
> -
>
> Key: KAFKA-17518
> URL: https://issues.apache.org/jira/browse/KAFKA-17518
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot 
> complete, leading to the consumer remaining in the consumer group.
> On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the 
> consumer group. This process requires hops back and forth between the 
> application and background threads to call the 
> {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to 
> the close step.
> The events used to communicate between the application and background threads 
> are based on the timeout provided by the user. If the timeout is not 
> sufficient, the events will expire, and the process will be left incomplete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout

2024-09-10 Thread Kirk True (Jira)
Kirk True created KAFKA-17518:
-

 Summary: AsyncKafkaConsumer cannot reliably leave group when 
closed with small timeout
 Key: KAFKA-17518
 URL: https://issues.apache.org/jira/browse/KAFKA-17518
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.9.0
Reporter: Kirk True


If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot 
complete, leading to the consumer remaining in the consumer group.

On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the consumer 
group. This process requires hops back and forth between the application and 
background threads to call the {{{}ConsumerRebalanceListener{}}}. Those hops 
add a nonzero amount of time to the close step.

The events used to communicate between the application and background threads 
are based on the timeout provided by the user. If the timeout is not 
sufficient, the events will expire, and the process will be left incomplete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16138) QuotaTest system test fails consistently in 3.7

2024-09-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16138:
--
Fix Version/s: 4.0.0

> QuotaTest system test fails consistently in 3.7
> ---
>
> Key: KAFKA-16138
> URL: https://issues.apache.org/jira/browse/KAFKA-16138
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0, 3.8.0
>Reporter: Stanislav Kozlovski
>Assignee: Kirk True
>Priority: Major
> Fix For: 4.0.0
>
>
> as mentioned in 
> [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,]
>  the test fails consistently:
> {code:java}
> ValueError('max() arg is an empty sequence')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py",
>  line 169, in test_quota
> success, msg = self.validate(self.kafka, producer, consumer)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py",
>  line 197, in validate
> metric.value for k, metrics in producer.metrics(group='producer-metrics', 
> name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics
> ValueError: max() arg is an empty sequence {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16138) QuotaTest system test fails consistently in 3.7

2024-09-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16138:
-

Assignee: Kirk True  (was: Philip Nee)

> QuotaTest system test fails consistently in 3.7
> ---
>
> Key: KAFKA-16138
> URL: https://issues.apache.org/jira/browse/KAFKA-16138
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0, 3.8.0
>Reporter: Stanislav Kozlovski
>Assignee: Kirk True
>Priority: Major
>
> as mentioned in 
> [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,]
>  the test fails consistently:
> {code:java}
> ValueError('max() arg is an empty sequence')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py",
>  line 169, in test_quota
> success, msg = self.validate(self.kafka, producer, consumer)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py",
>  line 197, in validate
> metric.value for k, metrics in producer.metrics(group='producer-metrics', 
> name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics
> ValueError: max() arg is an empty sequence {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17448) New consumer seek should update positions in background thread

2024-09-05 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879653#comment-17879653
 ] 

Kirk True commented on KAFKA-17448:
---

[~yangpoan]—sorry for the nag, but can you mark this as Patch Available since 
the PR is out for review? Thanks!

> New consumer seek should update positions in background thread
> --
>
> Key: KAFKA-17448
> URL: https://issues.apache.org/jira/browse/KAFKA-17448
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0, 3.8.0, 3.7.1
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> In the new AsyncKafkaConsumer, a call to seek will update the positions in 
> subscription state for the assigned partitions in the app thread 
> ([https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L796])
> This could lead to race conditions like we've seen when subscription state 
> changes in the app thread (over a set of assigned partitions), that could 
> have been modified in the background thread, leading to errors on "No current 
> assignment for partition " 
> [https://github.com/apache/kafka/blob/c23b6b0365af5c58b76d8ad3fb628f766f95348f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L378]
>  
> Also, positions update is moved the background with KAFKA-17066 for the same 
> reason, so even if the assignment does not change, we could have a race 
> between the background setting positions to the committed offsets for 
> instance, and the app thread setting them manually via seek. 
> To avoid all of the above, we should have seek generate an event, send it to 
> the background, and then update the subscription state when processing that 
> event (similar to other api calls, ex, assign with KAFKA-17064)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16138) QuotaTest system test fails consistently in 3.7

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16138:
--
Component/s: system tests

> QuotaTest system test fails consistently in 3.7
> ---
>
> Key: KAFKA-16138
> URL: https://issues.apache.org/jira/browse/KAFKA-16138
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0, 3.8.0
>Reporter: Stanislav Kozlovski
>Priority: Major
>
> as mentioned in 
> [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,]
>  the test fails consistently:
> {code:java}
> ValueError('max() arg is an empty sequence')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py",
>  line 169, in test_quota
> success, msg = self.validate(self.kafka, producer, consumer)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py",
>  line 197, in validate
> metric.value for k, metrics in producer.metrics(group='producer-metrics', 
> name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics
> ValueError: max() arg is an empty sequence {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16138) QuotaTest system test fails consistently in 3.7

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16138:
-

Assignee: Philip Nee

> QuotaTest system test fails consistently in 3.7
> ---
>
> Key: KAFKA-16138
> URL: https://issues.apache.org/jira/browse/KAFKA-16138
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0, 3.8.0
>Reporter: Stanislav Kozlovski
>Assignee: Philip Nee
>Priority: Major
>
> as mentioned in 
> [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,]
>  the test fails consistently:
> {code:java}
> ValueError('max() arg is an empty sequence')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py",
>  line 169, in test_quota
> success, msg = self.validate(self.kafka, producer, consumer)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py",
>  line 197, in validate
> metric.value for k, metrics in producer.metrics(group='producer-metrics', 
> name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics
> ValueError: max() arg is an empty sequence {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16138) QuotaTest system test fails consistently in 3.7

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16138:
--
Component/s: clients
 consumer

> QuotaTest system test fails consistently in 3.7
> ---
>
> Key: KAFKA-16138
> URL: https://issues.apache.org/jira/browse/KAFKA-16138
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Affects Versions: 3.7.0, 3.8.0
>Reporter: Stanislav Kozlovski
>Priority: Major
>
> as mentioned in 
> [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,]
>  the test fails consistently:
> {code:java}
> ValueError('max() arg is an empty sequence')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py",
>  line 169, in test_quota
> success, msg = self.validate(self.kafka, producer, consumer)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py",
>  line 197, in validate
> metric.value for k, metrics in producer.metrics(group='producer-metrics', 
> name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics
> ValueError: max() arg is an empty sequence {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17466) Revisit failAndRemoveExpiredCommitRequests method

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17466:
--
Component/s: clients
 consumer

> Revisit failAndRemoveExpiredCommitRequests method
> -
>
> Key: KAFKA-17466
> URL: https://issues.apache.org/jira/browse/KAFKA-17466
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: TengYao Chi
>Assignee: TengYao Chi
>Priority: Minor
>
> see discussion: 
> [https://github.com/apache/kafka/pull/16833#issuecomment-2301334628]
>  
> In short, we should consider removing the 
> `failAndRemoveExpiredCommitRequests` method since its functionality is 
> already handled elsewhere.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17456) Make sure FindCoordinatorResponse get created before consumer

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17456:
--
Component/s: clients
 consumer

> Make sure FindCoordinatorResponse get created before consumer
> -
>
> Key: KAFKA-17456
> URL: https://issues.apache.org/jira/browse/KAFKA-17456
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Major
>
> The incorrect order could lead to flaky (see KAFKA-17092 and KAFKA-17395). It 
> would be nice that we fix all of them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol

2024-09-03 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879020#comment-17879020
 ] 

Kirk True commented on KAFKA-17338:
---

[~m1a2st]—I assigned this to you and updated the status to "Patch Available" 
given that there's a PR under review.

Thanks for your help!

> ConsumerConfig should prevent using partition assignors with CONSUMER group 
> protocol
> 
>
> Key: KAFKA-17338
> URL: https://issues.apache.org/jira/browse/KAFKA-17338
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Reporter: Kirk True
>Assignee: 黃竣陽
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> {{ConsumerConfig}} should be updated to include additional validation in 
> {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, 
> the value for {{partition.assignment.strategy}} must be either null or empty. 
> Otherwise a {{ConfigException}} should be thrown.
> This is somewhat of the inverse case of KAFKA-15773.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17338:
--
Fix Version/s: 4.0.0

> ConsumerConfig should prevent using partition assignors with CONSUMER group 
> protocol
> 
>
> Key: KAFKA-17338
> URL: https://issues.apache.org/jira/browse/KAFKA-17338
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Reporter: Kirk True
>Assignee: 黃竣陽
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> {{ConsumerConfig}} should be updated to include additional validation in 
> {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, 
> the value for {{partition.assignment.strategy}} must be either null or empty. 
> Otherwise a {{ConfigException}} should be thrown.
> This is somewhat of the inverse case of KAFKA-15773.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-17338:
-

Assignee: 黃竣陽  (was: Kirk True)

> ConsumerConfig should prevent using partition assignors with CONSUMER group 
> protocol
> 
>
> Key: KAFKA-17338
> URL: https://issues.apache.org/jira/browse/KAFKA-17338
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Reporter: Kirk True
>Assignee: 黃竣陽
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> {{ConsumerConfig}} should be updated to include additional validation in 
> {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, 
> the value for {{partition.assignment.strategy}} must be either null or empty. 
> Otherwise a {{ConfigException}} should be thrown.
> This is somewhat of the inverse case of KAFKA-15773.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17116:
--
Labels: kip-848-client-support needs-kip  (was: kip-848-client-support)

> New consumer may not send effective leave group if member ID received after 
> close 
> --
>
> Key: KAFKA-17116
> URL: https://issues.apache.org/jira/browse/KAFKA-17116
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Assignee: TengYao Chi
>Priority: Major
>  Labels: kip-848-client-support, needs-kip
> Fix For: 4.0.0
>
>
> If the new consumer is closed after sending a HB to join, but before 
> receiving the response to it, it will send a leave group request but without 
> member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the 
> broker will have a registered new member, for which it will never receive a 
> leave request for it.
>  # consumer.subscribe -> sends HB to join, transitions to JOINING
>  # consumer.close -> will transition to LEAVING and send HB with epoch -1 
> (without waiting for in-flight requests)
>  # consumer receives response to initial HB, containing the assigned member 
> ID. It will simply ignore it because it's not in the group anymore 
> (UNSUBSCRIBED)
> Note that the expectation, with the current logic, and main downsides of this 
> are:
>  # If the case was that the member received partitions on the first HB, those 
> partitions won't be re-assigned (broker waiting for the closed consumer to 
> reconcile them), until the rebalance timeout expires. 
>  # Even if no partitions were assigned to it, the member will remain in the 
> group from the broker point of view (but not from the client POV). The member 
> will be eventually kicked out for not sending HBs, but only when it's session 
> timeout expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol

2024-09-03 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878956#comment-17878956
 ] 

Kirk True commented on KAFKA-17338:
---

[~m1a2st] do you want to assign this to yourself, given that you have already 
submitted a PR? Thanks!

> ConsumerConfig should prevent using partition assignors with CONSUMER group 
> protocol
> 
>
> Key: KAFKA-17338
> URL: https://issues.apache.org/jira/browse/KAFKA-17338
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> {{ConsumerConfig}} should be updated to include additional validation in 
> {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, 
> the value for {{partition.assignment.strategy}} must be either null or empty. 
> Otherwise a {{ConfigException}} should be thrown.
> This is somewhat of the inverse case of KAFKA-15773.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17453) Fix flaky PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17453:
--
Component/s: clients
 consumer

> Fix flaky 
> PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest
> ---
>
> Key: KAFKA-17453
> URL: https://issues.apache.org/jira/browse/KAFKA-17453
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {code:java}
> Errorjava.util.NoSuchElementExceptionStacktracejava.util.NoSuchElementException
>at 
> org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52) 
>at 
> kafka.api.PlaintextConsumerFetchTest.testFetchOutOfRangeOffsetResetConfigLatest(PlaintextConsumerFetchTest.scala:104)
> at java.lang.reflect.Method.invoke(Method.java:498) at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  
> at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>   at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)  
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)  
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
>   at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
>   at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
>   at java.util.ArrayList.forEach(ArrayList.java:1259) at 
> java.util.ArrayList.forEach(ArrayList.java:1259){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17453) Fix flaky PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17453:
--
Labels: integration-test  (was: )

> Fix flaky 
> PlaintextConsumerFetchTest#testFetchOutOfRangeOffsetResetConfigLatest
> ---
>
> Key: KAFKA-17453
> URL: https://issues.apache.org/jira/browse/KAFKA-17453
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: integration-test
>
> {code:java}
> Errorjava.util.NoSuchElementExceptionStacktracejava.util.NoSuchElementException
>at 
> org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52) 
>at 
> kafka.api.PlaintextConsumerFetchTest.testFetchOutOfRangeOffsetResetConfigLatest(PlaintextConsumerFetchTest.scala:104)
> at java.lang.reflect.Method.invoke(Method.java:498) at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)  
> at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>   at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)  
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)  
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
>   at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
>   at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) 
>   at java.util.ArrayList.forEach(ArrayList.java:1259) at 
> java.util.ArrayList.forEach(ArrayList.java:1259){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17451) Remove deprecated Consumer#committed

2024-09-03 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17451:
--
Component/s: clients
 consumer

> Remove deprecated Consumer#committed
> 
>
> Key: KAFKA-17451
> URL: https://issues.apache.org/jira/browse/KAFKA-17451
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Ming-Yen Chung
>Priority: Major
>
> The APIs were deprecated by KAFKA-8880 which is back in 2.4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17335) Lack of default for URL encoding configuration for OAuth causes NPE

2024-08-29 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877943#comment-17877943
 ] 

Kirk True commented on KAFKA-17335:
---

Thanks [~chia7712]!!!

> Lack of default for URL encoding configuration for OAuth causes NPE
> ---
>
> Key: KAFKA-17335
> URL: https://issues.apache.org/jira/browse/KAFKA-17335
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: OAuth, oauth
> Fix For: 4.0.0, 3.9.0
>
>
> KAFKA-16345 added a new client configuration option 
> {{{}SASL_OAUTHBEARER_HEADER_URLENCODE{}}}. This is an optional configuration, 
> so the user doesn't need to provide it. When an {{{}AdminConfig{}}}, 
> {{{}ConsumerConfig{}}}, or {{ProducerConfig}} object is created, it uses the 
> default value of {{DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE}} if the value 
> isn't present.
> However, if the configuration is created as a plain {{Map}} or {{Properties}} 
> and the {{sasl.oauthbearer.header.urlencode}} key isn't present, it can lead 
> to a {{{}NullPointerException{}}}. This occurs because the code in 
> {{AccessTokenRetriever.create()}} assumes that there's always a value present 
> in the incoming {{configs}} parameter. But if there isn't an entry for the 
> {{sasl.oauthbearer.header.urlencode}} key in the map, a 
> {{NullPointerException}} is thrown.
> When using map-based configuration, one workaround is to explicitly add an 
> entry to the map like so:
> {code:java}
> Map configs = new HashMap();
> . . .
> configs.put(SASL_OAUTHBEARER_HEADER_URLENCODE, 
> DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE);
> . . .
> configureSomething(configs);{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned

2024-08-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16623:
--
Fix Version/s: 4.0.0
   (was: 3.9.0)

> KafkaAsyncConsumer system tests warn about revoking partitions that weren't 
> previously assigned
> ---
>
> Key: KAFKA-16623
> URL: https://issues.apache.org/jira/browse/KAFKA-16623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 4.0.0
>
>
> When running system tests for the KafkaAsyncConsumer, we occasionally see 
> this warning:
> {noformat}
>   File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
> self.run()
>   File "/usr/lib/python3.7/threading.py", line 865, in run
> self._target(*self._args, **self._kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
>  line 38, in _protected_worker
> self._worker(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 304, in _worker
> handler.handle_partitions_revoked(event, node, self.logger)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 163, in handle_partitions_revoked
> (tp, node.account.hostname)
> AssertionError: Topic partition TopicPartition(topic='test_topic', 
> partition=0) cannot be revoked from worker20 as it was not previously 
> assigned to that consumer
> {noformat}
> In test_fencing_static_consumer, there are two sets of consumers that use 
> group instance IDs: the initial set and the "conflict" set. It appears that 
> one of the "conflicting" consumers hijacks the partition ownership from the 
> coordinator's perspective when the initial consumer leaves the group.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned

2024-08-29 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877879#comment-17877879
 ] 

Kirk True commented on KAFKA-16623:
---

[~cmccabe]—this is not a blocker for 3.9.0, no. I'll move it to 4.0. Thanks!

> KafkaAsyncConsumer system tests warn about revoking partitions that weren't 
> previously assigned
> ---
>
> Key: KAFKA-16623
> URL: https://issues.apache.org/jira/browse/KAFKA-16623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.9.0
>
>
> When running system tests for the KafkaAsyncConsumer, we occasionally see 
> this warning:
> {noformat}
>   File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
> self.run()
>   File "/usr/lib/python3.7/threading.py", line 865, in run
> self._target(*self._args, **self._kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
>  line 38, in _protected_worker
> self._worker(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 304, in _worker
> handler.handle_partitions_revoked(event, node, self.logger)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 163, in handle_partitions_revoked
> (tp, node.account.hostname)
> AssertionError: Topic partition TopicPartition(topic='test_topic', 
> partition=0) cannot be revoked from worker20 as it was not previously 
> assigned to that consumer
> {noformat}
> In test_fencing_static_consumer, there are two sets of consumers that use 
> group instance IDs: the initial set and the "conflict" set. It appears that 
> one of the "conflicting" consumers hijacks the partition ownership from the 
> coordinator's perspective when the initial consumer leaves the group.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17335) Lack of default for URL encoding configuration for OAuth causes NPE

2024-08-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17335:
--
Fix Version/s: (was: 3.9.0)

> Lack of default for URL encoding configuration for OAuth causes NPE
> ---
>
> Key: KAFKA-17335
> URL: https://issues.apache.org/jira/browse/KAFKA-17335
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: OAuth, oauth
> Fix For: 4.0.0
>
>
> KAFKA-16345 added a new client configuration option 
> {{{}SASL_OAUTHBEARER_HEADER_URLENCODE{}}}. This is an optional configuration, 
> so the user doesn't need to provide it. When an {{{}AdminConfig{}}}, 
> {{{}ConsumerConfig{}}}, or {{ProducerConfig}} object is created, it uses the 
> default value of {{DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE}} if the value 
> isn't present.
> However, if the configuration is created as a plain {{Map}} or {{Properties}} 
> and the {{sasl.oauthbearer.header.urlencode}} key isn't present, it can lead 
> to a {{{}NullPointerException{}}}. This occurs because the code in 
> {{AccessTokenRetriever.create()}} assumes that there's always a value present 
> in the incoming {{configs}} parameter. But if there isn't an entry for the 
> {{sasl.oauthbearer.header.urlencode}} key in the map, a 
> {{NullPointerException}} is thrown.
> When using map-based configuration, one workaround is to explicitly add an 
> entry to the map like so:
> {code:java}
> Map configs = new HashMap();
> . . .
> configs.put(SASL_OAUTHBEARER_HEADER_URLENCODE, 
> DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE);
> . . .
> configureSomething(configs);{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17439) Make polling for new records an explicit action/event in the new consumer

2024-08-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17439:
--
Fix Version/s: 4.0.0

> Make polling for new records an explicit action/event in the new consumer
> -
>
> Key: KAFKA-17439
> URL: https://issues.apache.org/jira/browse/KAFKA-17439
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> Presently, the new consumer polls the FetchRequestManager many, many times a 
> second and creates fetch requests for any fetchable partitions. In order to 
> more closely mirror how the existing consumer processes fetches, we should 
> mirror the points at which fetch requests are sent in the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17439) Make polling for new records an explicit action/event in the new consumer

2024-08-28 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17439:
--
Affects Version/s: 3.7.0

> Make polling for new records an explicit action/event in the new consumer
> -
>
> Key: KAFKA-17439
> URL: https://issues.apache.org/jira/browse/KAFKA-17439
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> Presently, the new consumer polls the FetchRequestManager many, many times a 
> second and creates fetch requests for any fetchable partitions. In order to 
> more closely mirror how the existing consumer processes fetches, we should 
> mirror the points at which fetch requests are sent in the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17439) Make polling for new records an explicit action/event in the new consumer

2024-08-28 Thread Kirk True (Jira)
Kirk True created KAFKA-17439:
-

 Summary: Make polling for new records an explicit action/event in 
the new consumer
 Key: KAFKA-17439
 URL: https://issues.apache.org/jira/browse/KAFKA-17439
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


Presently, the new consumer polls the FetchRequestManager many, many times a 
second and creates fetch requests for any fetchable partitions. In order to 
more closely mirror how the existing consumer processes fetches, we should 
mirror the points at which fetch requests are sent in the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14830) Illegal state error in transactional producer

2024-08-27 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877221#comment-17877221
 ] 

Kirk True commented on KAFKA-14830:
---

cc [~mjsax] 

> Illegal state error in transactional producer
> -
>
> Key: KAFKA-14830
> URL: https://issues.apache.org/jira/browse/KAFKA-14830
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.1.2
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Critical
>  Labels: transactions
> Fix For: 4.0.0
>
>
> We have seen the following illegal state error in the producer:
> {code:java}
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topic-0:120027 ms has passed since batch creation
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topic-1:120026 ms has passed since batch creation
> [Producer clientId=client-id2, transactionalId=transactional-id] Aborting 
> incomplete transaction
> [Producer clientId=client-id2, transactionalId=transactional-id] Invoking 
> InitProducerId with current producer ID and epoch 
> ProducerIdAndEpoch(producerId=191799, epoch=0) in order to bump the epoch
> [Producer clientId=client-id2, transactionalId=transactional-id] ProducerId 
> set to 191799 with epoch 1
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.NetworkException: Disconnected from node 4
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> [Producer clientId=client-id2, transactionalId=transactional-id] Uncaught 
> error in request completion:
> java.lang.IllegalStateException: TransactionalId transactional-id: Invalid 
> transition attempted from state READY to state ABORTABLE_ERROR
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1089)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:508)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:734)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:739)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:753)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575)
>         at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562)
>         at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  {code}
> The producer hits timeouts which cause it to abort an active transaction. 
> After aborting, the producer bumps its epoch, which transitions it back to 
> the `READY` state. Following this, there are two errors for inflight 
> requests, which cause an illegal state transition to `ABORTABLE_ERROR`. But 
> how could the transaction ABORT complete if there were still inflight 
> requests? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14830) Illegal state error in transactional producer

2024-08-27 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877220#comment-17877220
 ] 

Kirk True commented on KAFKA-14830:
---

[~jolshan] [~alivshits] [~calvinliu]—I have looked into a number of Jiras, 
reports, and logs about the “{{{}Invalid transition attempted from state X to 
state Y{}}}” that intermittently plagues the transaction manager.

My hypothesis is that there's a race condition between the {{Sender}} thread 
and the application thread when handling transaction errors. When an error 
occurs, the {{Sender}} thread calls {{TransactionManager.handleFailedBatch()}} 
which updates the state to {{{}ABORTABLE_ERROR{}}}. However, this state 
mutation attempt occurs *_after_* the application thread has already 
successfully aborted (and thus transitioned the state to {{{}READY{}}}) or has 
hard failed (setting the state to {{{}FATAL_ERROR{}}}).

I have a draft PR (linked above) which tries to handle some of these cases in 
{{{}handleFailedBatch(){}}}. Note: my confidence level on the "fix" is very 
low, but I'd love to get feedback on the PR and/or comments on this Jira.

Thanks!

> Illegal state error in transactional producer
> -
>
> Key: KAFKA-14830
> URL: https://issues.apache.org/jira/browse/KAFKA-14830
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.1.2
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Critical
>  Labels: transactions
> Fix For: 4.0.0
>
>
> We have seen the following illegal state error in the producer:
> {code:java}
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topic-0:120027 ms has passed since batch creation
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topic-1:120026 ms has passed since batch creation
> [Producer clientId=client-id2, transactionalId=transactional-id] Aborting 
> incomplete transaction
> [Producer clientId=client-id2, transactionalId=transactional-id] Invoking 
> InitProducerId with current producer ID and epoch 
> ProducerIdAndEpoch(producerId=191799, epoch=0) in order to bump the epoch
> [Producer clientId=client-id2, transactionalId=transactional-id] ProducerId 
> set to 191799 with epoch 1
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.NetworkException: Disconnected from node 4
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> [Producer clientId=client-id2, transactionalId=transactional-id] Uncaught 
> error in request completion:
> java.lang.IllegalStateException: TransactionalId transactional-id: Invalid 
> transition attempted from state READY to state ABORTABLE_ERROR
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1089)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:508)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:734)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:739)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:753)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575)
>         at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562)
>         at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:58

[jira] [Commented] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-08-27 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877042#comment-17877042
 ] 

Kirk True commented on KAFKA-16792:
---

{{AsyncKafkaConsumer.poll()}} should indirectly cause a 
{{FindCoordinatorRequest}} to be sent in the background thread. When you state 
that “we don't guarantee” that it will, that sounds like a bug. :(

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testCurrentLag
>  - testFetchStableOffsetThrowInPoll
>  - testListOffsetShouldUpdateSubscriptions
>  - testPollReturnsRecords
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17397) Ensure ClassicKafkaConsumer sends leave request on close even if interrupted

2024-08-27 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877037#comment-17877037
 ] 

Kirk True commented on KAFKA-17397:
---

[~goyarpit]—yes, feel free to work on it :)

> Ensure ClassicKafkaConsumer sends leave request on close even if interrupted
> 
>
> Key: KAFKA-17397
> URL: https://issues.apache.org/jira/browse/KAFKA-17397
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0, 3.9.0
>Reporter: Kirk True
>Priority: Major
>  Labels: integration-tests
>
> During testing for KAFKA-16985, a new, parameterized integration test was 
> added to {{PlaintextConsumerTest}} named 
> {{{}testCloseLeavesGroupOnInterrupt(){}}}. When the test is executed locally, 
> it passes using both the {{AsyncKafkaConsumer}} and the 
> {{{}ClassicKafkaConsumer{}}}. However, when the test is run in the Apache CI 
> environment, it passes for the {{AsyncKafkaConsumer}} but fails for the 
> {{{}ClassicKafkaConsumer{}}}. Rather than hold up KAFKA-16985, this Jira was 
> filed to investigate and fix the {{{}ClassicKafkaConsumer{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17421) Add IT for ConsumerRecord#leaderEpoch

2024-08-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17421:
--
Component/s: clients
 consumer

> Add IT for ConsumerRecord#leaderEpoch
> -
>
> Key: KAFKA-17421
> URL: https://issues.apache.org/jira/browse/KAFKA-17421
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Chuan Yu
>Priority: Minor
>
> The test should includes following checks:
> 1. the leader epoch is not empty
> 2. it gets updated after the leader gets changed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17421) Add IT for ConsumerRecord#leaderEpoch

2024-08-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17421:
--
Labels: integration-test  (was: )

> Add IT for ConsumerRecord#leaderEpoch
> -
>
> Key: KAFKA-17421
> URL: https://issues.apache.org/jira/browse/KAFKA-17421
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Chuan Yu
>Priority: Minor
>  Labels: integration-test
>
> The test should includes following checks:
> 1. the leader epoch is not empty
> 2. it gets updated after the leader gets changed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17182) Consumer fetch sessions are evicted too quickly

2024-08-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17182:
--
Summary: Consumer fetch sessions are evicted too quickly  (was: Consumer’s 
fetch sessions are evicted too quickly)

> Consumer fetch sessions are evicted too quickly
> ---
>
> Key: KAFKA-17182
> URL: https://issues.apache.org/jira/browse/KAFKA-17182
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> In stress testing the new consumer, the new consumer is evicting fetch 
> sessions on the broker much more frequently than expected. There is an 
> ongoing investigation into this behavior, but it appears to stem from a race 
> condition due to the design of the new consumer.
> In the background thread, fetch requests are sent in a near continuous 
> fashion for partitions that are "fetchable." A timing bug appears to cause 
> partitions to be "unfetchable," which then causes them to end up in the 
> "removed" set of partitions. The broker then removes them from the fetch 
> session, which causes the number of remaining partitions for that session to 
> drop below a threshold that allows it to be evicted by another competing 
> session. Within a few milliseconds, though, the partitions become "fetchable" 
> again, and are added to the "added" set of partitions on the next fetch 
> request. This causes thrashing on both the client and broker sides as both 
> are handling a steady stream of evictions, which negatively affects 
> consumption throughput.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17335) Lack of default for URL encoding configuration for OAuth causes NPE

2024-08-23 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876399#comment-17876399
 ] 

Kirk True commented on KAFKA-17335:
---

No worries, [~bachmanity1]!

Regarding 3.9.0, I think we should ask the release manager on the mailing list.

> Lack of default for URL encoding configuration for OAuth causes NPE
> ---
>
> Key: KAFKA-17335
> URL: https://issues.apache.org/jira/browse/KAFKA-17335
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: OAuth, oauth
> Fix For: 4.0.0
>
>
> KAFKA-16345 added a new client configuration option 
> {{{}SASL_OAUTHBEARER_HEADER_URLENCODE{}}}. This is an optional configuration, 
> so the user doesn't need to provide it. When an {{{}AdminConfig{}}}, 
> {{{}ConsumerConfig{}}}, or {{ProducerConfig}} object is created, it uses the 
> default value of {{DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE}} if the value 
> isn't present.
> However, if the configuration is created as a plain {{Map}} or {{Properties}} 
> and the {{sasl.oauthbearer.header.urlencode}} key isn't present, it can lead 
> to a {{{}NullPointerException{}}}. This occurs because the code in 
> {{AccessTokenRetriever.create()}} assumes that there's always a value present 
> in the incoming {{configs}} parameter. But if there isn't an entry for the 
> {{sasl.oauthbearer.header.urlencode}} key in the map, a 
> {{NullPointerException}} is thrown.
> When using map-based configuration, one workaround is to explicitly add an 
> entry to the map like so:
> {code:java}
> Map configs = new HashMap();
> . . .
> configs.put(SASL_OAUTHBEARER_HEADER_URLENCODE, 
> DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE);
> . . .
> configureSomething(configs);{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17335) Lack of default for URL encoding configuration for OAuth causes NPE

2024-08-23 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17335:
--
Summary: Lack of default for URL encoding configuration for OAuth causes 
NPE  (was: Lack of default for missing URL encoding option for OAuth causes NPE)

> Lack of default for URL encoding configuration for OAuth causes NPE
> ---
>
> Key: KAFKA-17335
> URL: https://issues.apache.org/jira/browse/KAFKA-17335
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, security
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: OAuth, oauth
> Fix For: 4.0.0
>
>
> KAFKA-16345 added a new client configuration option 
> {{{}SASL_OAUTHBEARER_HEADER_URLENCODE{}}}. This is an optional configuration, 
> so the user doesn't need to provide it. When an {{{}AdminConfig{}}}, 
> {{{}ConsumerConfig{}}}, or {{ProducerConfig}} object is created, it uses the 
> default value of {{DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE}} if the value 
> isn't present.
> However, if the configuration is created as a plain {{Map}} or {{Properties}} 
> and the {{sasl.oauthbearer.header.urlencode}} key isn't present, it can lead 
> to a {{{}NullPointerException{}}}. This occurs because the code in 
> {{AccessTokenRetriever.create()}} assumes that there's always a value present 
> in the incoming {{configs}} parameter. But if there isn't an entry for the 
> {{sasl.oauthbearer.header.urlencode}} key in the map, a 
> {{NullPointerException}} is thrown.
> When using map-based configuration, one workaround is to explicitly add an 
> entry to the map like so:
> {code:java}
> Map configs = new HashMap();
> . . .
> configs.put(SASL_OAUTHBEARER_HEADER_URLENCODE, 
> DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE);
> . . .
> configureSomething(configs);{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17182) Consumer’s fetch sessions are evicted too quickly

2024-08-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17182:
--
Summary: Consumer’s fetch sessions are evicted too quickly  (was: 
Consumer's fetch sessions are evicted too quickly)

> Consumer’s fetch sessions are evicted too quickly
> -
>
> Key: KAFKA-17182
> URL: https://issues.apache.org/jira/browse/KAFKA-17182
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> In stress testing the new consumer, the new consumer is evicting fetch 
> sessions on the broker much more frequently than expected. There is an 
> ongoing investigation into this behavior, but it appears to stem from a race 
> condition due to the design of the new consumer.
> In the background thread, fetch requests are sent in a near continuous 
> fashion for partitions that are "fetchable." A timing bug appears to cause 
> partitions to be "unfetchable," which then causes them to end up in the 
> "removed" set of partitions. The broker then removes them from the fetch 
> session, which causes the number of remaining partitions for that session to 
> drop below a threshold that allows it to be evicted by another competing 
> session. Within a few milliseconds, though, the partitions become "fetchable" 
> again, and are added to the "added" set of partitions on the next fetch 
> request. This causes thrashing on both the client and broker sides as both 
> are handling a steady stream of evictions, which negatively affects 
> consumption throughput.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17377) Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe

2024-08-21 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875692#comment-17875692
 ] 

Kirk True commented on KAFKA-17377:
---

[~yangpoan]—your suggestion makes a lot of sense.

[~lianetm] [~pnee]—do you have any thoughts on this?

> Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe
> 
>
> Key: KAFKA-17377
> URL: https://issues.apache.org/jira/browse/KAFKA-17377
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor
>
> This is followup from 
> [https://github.com/apache/kafka/pull/16673#discussion_r1717009724]
> AsyncKafkaConsumer#unsubscribe uses Long.MAX_VALUE to calculate deadline. 
> However, most of AsyncKafkaConsumer operations use 
> [default.api.timeout.ms|https://kafka.apache.org/documentation/#consumerconfigs_default.api.timeout.ms]
>  if users don't specify a timeout. In [design 
> document|https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts#JavaclientConsumertimeouts-DefaultTimeouts],
>  it also mentions that using default.api.timeout.ms as default timeout.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17377) Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe

2024-08-21 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17377:
--
Labels: refactor  (was: )

> Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe
> 
>
> Key: KAFKA-17377
> URL: https://issues.apache.org/jira/browse/KAFKA-17377
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>  Labels: refactor
>
> This is followup from 
> [https://github.com/apache/kafka/pull/16673#discussion_r1717009724]
> AsyncKafkaConsumer#unsubscribe uses Long.MAX_VALUE to calculate deadline. 
> However, most of AsyncKafkaConsumer operations use 
> [default.api.timeout.ms|https://kafka.apache.org/documentation/#consumerconfigs_default.api.timeout.ms]
>  if users don't specify a timeout. In [design 
> document|https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts#JavaclientConsumertimeouts-DefaultTimeouts],
>  it also mentions that using default.api.timeout.ms as default timeout.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17377) Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe

2024-08-21 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17377:
--
Labels: consumer-threading-refactor  (was: refactor)

> Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe
> 
>
> Key: KAFKA-17377
> URL: https://issues.apache.org/jira/browse/KAFKA-17377
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor
>
> This is followup from 
> [https://github.com/apache/kafka/pull/16673#discussion_r1717009724]
> AsyncKafkaConsumer#unsubscribe uses Long.MAX_VALUE to calculate deadline. 
> However, most of AsyncKafkaConsumer operations use 
> [default.api.timeout.ms|https://kafka.apache.org/documentation/#consumerconfigs_default.api.timeout.ms]
>  if users don't specify a timeout. In [design 
> document|https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts#JavaclientConsumertimeouts-DefaultTimeouts],
>  it also mentions that using default.api.timeout.ms as default timeout.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17377) Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe

2024-08-21 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17377:
--
Component/s: clients
 consumer

> Consider using defaultApiTimeoutMs in AsyncKafkaConsumer#unsubscribe
> 
>
> Key: KAFKA-17377
> URL: https://issues.apache.org/jira/browse/KAFKA-17377
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>
> This is followup from 
> [https://github.com/apache/kafka/pull/16673#discussion_r1717009724]
> AsyncKafkaConsumer#unsubscribe uses Long.MAX_VALUE to calculate deadline. 
> However, most of AsyncKafkaConsumer operations use 
> [default.api.timeout.ms|https://kafka.apache.org/documentation/#consumerconfigs_default.api.timeout.ms]
>  if users don't specify a timeout. In [design 
> document|https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts#JavaclientConsumertimeouts-DefaultTimeouts],
>  it also mentions that using default.api.timeout.ms as default timeout.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17395) Flaky test testMissingOffsetNoResetPolicy for new consumer

2024-08-21 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17395:
--
Labels: consumer-threading-refactor flaky-test  (was: 
consumer-threading-refactor)

> Flaky test testMissingOffsetNoResetPolicy for new consumer
> --
>
> Key: KAFKA-17395
> URL: https://issues.apache.org/jira/browse/KAFKA-17395
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test
>
> KafkaConsumerTest.testMissingOffsetNoResetPolicy is flaky when running for 
> the new consumer (passing consistently for the classic consumer). 
> Fails with : 
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Consumer was not able to update fetch positions on continuous calls with 0 
> timeout ==> expected:  but was: 
> It's been flaky since it was enabled for the new consumer with 
> [https://github.com/apache/kafka/pull/16587] 
> See last couple of month runs on trunk showing the flakiness: 
> [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172429919&search.startTimeMin=171721440&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testMissingOffsetNoResetPolicy(GroupProtocol)%5B2%5D]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17397) Ensure ClassicKafkaConsumer sends leave request on close even if interrupted

2024-08-21 Thread Kirk True (Jira)
Kirk True created KAFKA-17397:
-

 Summary: Ensure ClassicKafkaConsumer sends leave request on close 
even if interrupted
 Key: KAFKA-17397
 URL: https://issues.apache.org/jira/browse/KAFKA-17397
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.8.0, 3.9.0
Reporter: Kirk True


During testing for KAFKA-16985, a new, parameterized integration test was added 
to {{PlaintextConsumerTest}} named {{{}testCloseLeavesGroupOnInterrupt(){}}}. 
When the test is executed locally, it passes using both the 
{{AsyncKafkaConsumer}} and the {{{}ClassicKafkaConsumer{}}}. However, when the 
test is run in the Apache CI environment, it passes for the 
{{AsyncKafkaConsumer}} but fails for the {{{}ClassicKafkaConsumer{}}}. Rather 
than hold up KAFKA-16985, this Jira was filed to investigate and fix the 
{{{}ClassicKafkaConsumer{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16985) Ensure consumer sends leave request on close even if interrupted

2024-08-21 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16985:
--
Fix Version/s: 4.0.0
   (was: 3.9.0)

> Ensure consumer sends leave request on close even if interrupted
> 
>
> Key: KAFKA-16985
> URL: https://issues.apache.org/jira/browse/KAFKA-16985
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> While running some stress tests we found out consumers were not leaving the 
> group if interrupted while closing (leading to members getting eventually 
> fenced after the session expired). On close, the consumer generates an 
> Unsubscribe event to be handled in the background, but we noticed that the 
> network thread failed with the interruption, seemingly not sending the unsent 
> requests. We should review this to ensure that a member does a clean leave, 
> notifying the coordinator with a leave HB, even if in a fire-and-forget mode 
> in the case of interruption (and validate the legacy consumer behaviour in 
> this scenario).  (Still under investigation, I'll update more info as I 
> discover it)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16443) Update streams_static_membership_test.py to support KIP-848’s group protocol config

2024-08-21 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16443.
---
Resolution: Won't Fix

This Jira was created prior to getting input from the Kafka Streams developers 
regarding KIP-848 changes. They've stated that this is not the approach they'll 
take, so closing as won't fix.

> Update streams_static_membership_test.py to support KIP-848’s group protocol 
> config
> ---
>
> Key: KAFKA-16443
> URL: https://issues.apache.org/jira/browse/KAFKA-16443
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, streams, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in 
> {{streams_static_membership_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16441) Update streams_broker_down_resilience_test.py to support KIP-848’s group protocol config

2024-08-21 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16441.
---
Resolution: Won't Fix

This Jira was created prior to getting input from the Kafka Streams developers 
regarding KIP-848 changes. They've stated that this is not the approach they'll 
take, so closing as won't fix.

> Update streams_broker_down_resilience_test.py to support KIP-848’s group 
> protocol config
> 
>
> Key: KAFKA-16441
> URL: https://issues.apache.org/jira/browse/KAFKA-16441
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, streams, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in 
> {{streams_broker_down_resilience_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16442) Update streams_standby_replica_test.py to support KIP-848’s group protocol config

2024-08-21 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16442.
---
Resolution: Won't Fix

This Jira was created prior to getting input from the Kafka Streams developers 
regarding KIP-848 changes. They've stated that this is not the approach they'll 
take, so closing as won't fix.

> Update streams_standby_replica_test.py to support KIP-848’s group protocol 
> config
> -
>
> Key: KAFKA-16442
> URL: https://issues.apache.org/jira/browse/KAFKA-16442
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, streams, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 4.0.0
>
>
> This task is to update the test method(s) in 
> {{streams_standby_replica_test.py}} to support the {{group.protocol}} 
> configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17363) Flaky test: kafka.api.PlaintextConsumerCommitTest.testAutoCommitOnRebalance(String, String).quorum=kraft+kip848.groupProtocol=consumer

2024-08-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17363:
--
Labels: integration-tests  (was: )

> Flaky test: 
> kafka.api.PlaintextConsumerCommitTest.testAutoCommitOnRebalance(String, 
> String).quorum=kraft+kip848.groupProtocol=consumer
> --
>
> Key: KAFKA-17363
> URL: https://issues.apache.org/jira/browse/KAFKA-17363
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: integration-tests
>
> kafka.api.PlaintextConsumerCommitTest.testAutoCommitOnRebalance(String, 
> String).quorum=kraft+kip848.groupProtocol=consumer
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-16890/3/testReport/kafka.api/PlaintextConsumerCommitTest/Build___JDK_8_and_Scala_2_12___testAutoCommitOnRebalance_String__String__quorum_kraft_kip848_groupProtocol_consumer/]
> {code:java}
> org.opentest4j.AssertionFailedError: Topic [topic2] metadata not propagated 
> after 6 ms
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:138)
>   at 
> kafka.utils.TestUtils$.waitForAllPartitionsMetadata(TestUtils.scala:931)
>   at kafka.utils.TestUtils$.createTopicWithAdmin(TestUtils.scala:474)
>   at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createTopic$1(KafkaServerTestHarness.scala:193)
>   at scala.util.Using$.resource(Using.scala:269)
>   at 
> kafka.integration.KafkaServerTestHarness.createTopic(KafkaServerTestHarness.scala:185)
>   at 
> kafka.api.PlaintextConsumerCommitTest.testAutoCommitOnRebalance(PlaintextConsumerCommitTest.scala:223)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>   at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
>   at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
>   at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.ut

[jira] [Updated] (KAFKA-17361) KAFKA producer thread hung or infinite wait state

2024-08-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17361:
--
Component/s: clients
 producer 

> KAFKA producer thread hung or infinite wait state 
> --
>
> Key: KAFKA-17361
> URL: https://issues.apache.org/jira/browse/KAFKA-17361
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.5.0
>Reporter: N S N MURTHY
>Priority: Major
>  Labels: kafka-streams
>
> We noticed an issue where one producer thread is hung or infinite wait.
> KAFKA: kafka-2.5.0
> KAFKA Client: 3.6.2
> We recovered the same by restarting our application. Please let me know your 
> comments on this.
> java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(java.base@17.0.8.1/Native Method)
> - waiting on
> at org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55)
> - locked <0x1000ad72c348> (a 
> org.apache.kafka.clients.producer.internals.ProducerMetadata)
> at 
> org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119)
> - locked <0x1000ad72c348> (a 
> org.apache.kafka.clients.producer.internals.ProducerMetadata)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1132)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:981)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:260)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:44)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals

[jira] [Updated] (KAFKA-17363) Flaky test: kafka.api.PlaintextConsumerCommitTest.testAutoCommitOnRebalance(String, String).quorum=kraft+kip848.groupProtocol=consumer

2024-08-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17363:
--
Component/s: clients
 consumer

> Flaky test: 
> kafka.api.PlaintextConsumerCommitTest.testAutoCommitOnRebalance(String, 
> String).quorum=kraft+kip848.groupProtocol=consumer
> --
>
> Key: KAFKA-17363
> URL: https://issues.apache.org/jira/browse/KAFKA-17363
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Apoorv Mittal
>Priority: Major
>
> kafka.api.PlaintextConsumerCommitTest.testAutoCommitOnRebalance(String, 
> String).quorum=kraft+kip848.groupProtocol=consumer
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-16890/3/testReport/kafka.api/PlaintextConsumerCommitTest/Build___JDK_8_and_Scala_2_12___testAutoCommitOnRebalance_String__String__quorum_kraft_kip848_groupProtocol_consumer/]
> {code:java}
> org.opentest4j.AssertionFailedError: Topic [topic2] metadata not propagated 
> after 6 ms
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:138)
>   at 
> kafka.utils.TestUtils$.waitForAllPartitionsMetadata(TestUtils.scala:931)
>   at kafka.utils.TestUtils$.createTopicWithAdmin(TestUtils.scala:474)
>   at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createTopic$1(KafkaServerTestHarness.scala:193)
>   at scala.util.Using$.resource(Using.scala:269)
>   at 
> kafka.integration.KafkaServerTestHarness.createTopic(KafkaServerTestHarness.scala:185)
>   at 
> kafka.api.PlaintextConsumerCommitTest.testAutoCommitOnRebalance(PlaintextConsumerCommitTest.scala:223)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>   at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
>   at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
>   at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.f

[jira] [Commented] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol

2024-08-19 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875038#comment-17875038
 ] 

Kirk True commented on KAFKA-17338:
---

Take a look at the {{AbstractConfig.originals()}} method. I believe it will 
return a map with the values the user provided, _before_ any defaults were 
applied.

> ConsumerConfig should prevent using partition assignors with CONSUMER group 
> protocol
> 
>
> Key: KAFKA-17338
> URL: https://issues.apache.org/jira/browse/KAFKA-17338
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> {{ConsumerConfig}} should be updated to include additional validation in 
> {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, 
> the value for {{partition.assignment.strategy}} must be either null or empty. 
> Otherwise a {{ConfigException}} should be thrown.
> This is somewhat of the inverse case of KAFKA-15773.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol

2024-08-15 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17874075#comment-17874075
 ] 

Kirk True commented on KAFKA-17338:
---

[~m1a2st]—sounds good! Thanks!

> ConsumerConfig should prevent using partition assignors with CONSUMER group 
> protocol
> 
>
> Key: KAFKA-17338
> URL: https://issues.apache.org/jira/browse/KAFKA-17338
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> {{ConsumerConfig}} should be updated to include additional validation in 
> {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, 
> the value for {{partition.assignment.strategy}} must be either null or empty. 
> Otherwise a {{ConfigException}} should be thrown.
> This is somewhat of the inverse case of KAFKA-15773.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16221) IllegalStateException from Producer

2024-08-15 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17874074#comment-17874074
 ] 

Kirk True commented on KAFKA-16221:
---

I'm working in this area of invalid transaction states. [~mjsax]—can I reopen 
and assign to myself?

> IllegalStateException from Producer
> ---
>
> Key: KAFKA-16221
> URL: https://issues.apache.org/jira/browse/KAFKA-16221
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Matthias J. Sax
>Priority: Critical
> Fix For: 3.7.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
> internal TX state transition and the producer is now throwing an 
> IllegalStateException in situations it did swallow an internal error before.
> This change surfaces a bug in Kafka Streams: Kafka Streams calls 
> `abortTransaction()` blindly when a task is closed dirty, even if the 
> Producer is already in an internal fatal state. However, if the Producer is 
> in a fatal state, Kafka Streams should skip `abortTransaction` and only 
> `close()` the Producer when closing a task dirty.
> The bug is surfaced after `commitTransaction()` did timeout or after an 
> `InvalidProducerEpochException` from a `send()` call, leading to the call to 
> `abortTransaction()` – Kafka Streams does not track right now if a commit-TX 
> is in progress.
> {code:java}
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) {code}
> and
> {code:java}
> [2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread 
> [i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered 
> sending r   ecord to topic joined-counts for task 1_2 due to:
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be sent since 
> the producer is fenced, indicating the task may be migrated out 
> (org.apache.kafka.streams.processor.internals.RecordCollectorImp   l)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> // followed by
> [2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] [Producer 
> clientId=i-01aea6907970b1bf6-StreamThread-1-producer, 
> transactionalId=stream-soak-test   
> -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] Aborting producer batches due to 
> fatal error (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$8(Sender.java:917)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
> at 
> org.apa

[jira] [Reopened] (KAFKA-17208) replica_scale_test.py fails for new consumer

2024-08-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reopened KAFKA-17208:
---

> replica_scale_test.py fails for new consumer
> 
>
> Key: KAFKA-17208
> URL: https://issues.apache.org/jira/browse/KAFKA-17208
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> {{replica_scale_test}}’s {{test_produce_consume}} fails when using a 
> {{group_protocol}} of {{CONSUMER}}:
>  
> {noformat}
> TimeoutError('replicas-consume-workload failed to finish in the expected 
> amount of time.')
> Traceback (most recent call last):
>   File 
> "/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/tests/runner_client.py",
>  line 183, in _do_run
> data = self.run_test()
>   File 
> "/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/tests/runner_client.py",
>  line 243, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/home/kafka/tests/kafkatest/tests/core/replica_scale_test.py", line 
> 116, in test_produce_consume
> consume_workload.wait_for_done(timeout_sec=600)
>   File "/home/kafka/tests/kafkatest/services/trogdor/trogdor.py", line 352, 
> in wait_for_done
> wait_until(lambda: self.done(),
>   File 
> "/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: replicas-consume-workload failed to finish in 
> the expected amount of time.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17337) ConsumerConfig should default to CONSUMER for group.protocol

2024-08-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17337:
--
Description: {{ConsumerConfig}}’s default value for {{group.protocol}} 
should be changed from {{CLASSIC}} to {{CONSUMER}} for 4.0.0.  (was: The 
{{ConsumerConfig}} default value for {{GROUP_PROTOCOL_CONFIG}} is 
{{{}CLASSIC{}}}: 
[source|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L113].
 It should be changed to {{CONSUMER}} for 4.0.0.)

> ConsumerConfig should default to CONSUMER for group.protocol
> 
>
> Key: KAFKA-17337
> URL: https://issues.apache.org/jira/browse/KAFKA-17337
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> {{ConsumerConfig}}’s default value for {{group.protocol}} should be changed 
> from {{CLASSIC}} to {{CONSUMER}} for 4.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17337) ConsumerConfig should default to CONSUMER for group.protocol

2024-08-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17337:
--
Summary: ConsumerConfig should default to CONSUMER for group.protocol  
(was: ConsumerConfig should default to CONSUMER for the group.protocol)

> ConsumerConfig should default to CONSUMER for group.protocol
> 
>
> Key: KAFKA-17337
> URL: https://issues.apache.org/jira/browse/KAFKA-17337
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> The {{ConsumerConfig}} default value for {{GROUP_PROTOCOL_CONFIG}} is 
> {{{}CLASSIC{}}}: 
> [source|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L113].
>  It should be changed to {{CONSUMER}} for 4.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17337) ConsumerConfig should set the group.protocol default to CONSUMER

2024-08-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17337:
--
Summary: ConsumerConfig should set the group.protocol default to CONSUMER  
(was: Make CONSUMER group protocol default)

> ConsumerConfig should set the group.protocol default to CONSUMER
> 
>
> Key: KAFKA-17337
> URL: https://issues.apache.org/jira/browse/KAFKA-17337
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> The {{ConsumerConfig}} default value for {{GROUP_PROTOCOL_CONFIG}} is 
> {{{}CLASSIC{}}}: 
> [source|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L113].
>  It should be changed to {{CONSUMER}} for 4.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17337) ConsumerConfig should default to CONSUMER for the group.protocol

2024-08-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17337:
--
Summary: ConsumerConfig should default to CONSUMER for the group.protocol  
(was: ConsumerConfig should set the group.protocol default to CONSUMER)

> ConsumerConfig should default to CONSUMER for the group.protocol
> 
>
> Key: KAFKA-17337
> URL: https://issues.apache.org/jira/browse/KAFKA-17337
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Affects Versions: 3.9.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.0.0
>
>
> The {{ConsumerConfig}} default value for {{GROUP_PROTOCOL_CONFIG}} is 
> {{{}CLASSIC{}}}: 
> [source|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L113].
>  It should be changed to {{CONSUMER}} for 4.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17338) ConsumerConfig should prevent using partition assignors with CONSUMER group protocol

2024-08-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17338:
--
Summary: ConsumerConfig should prevent using partition assignors with 
CONSUMER group protocol  (was: Add validation to ConsumerConfig to prevent 
using partition assignors with CONSUMER group protocol)

> ConsumerConfig should prevent using partition assignors with CONSUMER group 
> protocol
> 
>
> Key: KAFKA-17338
> URL: https://issues.apache.org/jira/browse/KAFKA-17338
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> {{ConsumerConfig}} should be updated to include additional validation in 
> {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, 
> the value for {{partition.assignment.strategy}} must be either null or empty. 
> Otherwise a {{ConfigException}} should be thrown.
> This is somewhat of the inverse case of KAFKA-15773.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17338) Add validation to ConsumerConfig to prevent using partition assignors with CONSUMER group protocol

2024-08-13 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17338:
--
Description: 
{{ConsumerConfig}} should be updated to include additional validation in 
{{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, 
the value for {{partition.assignment.strategy}} must be either null or empty. 
Otherwise a {{ConfigException}} should be thrown.

This is somewhat of the inverse case of KAFKA-15773.

  was:
{{ConsumerConfig}} should be updated to include additional validation in{{{} 
postProcessParsedConfig(){}}}—when {{group.protocol}} is set to 
{{{}CONSUMER{}}}, the value for {{partition.assignment.strategy}} must be 
either null or empty. Otherwise a {{ConfigException}} should be thrown.

This is somewhat of the inverse case of KAFKA-15773.


> Add validation to ConsumerConfig to prevent using partition assignors with 
> CONSUMER group protocol
> --
>
> Key: KAFKA-17338
> URL: https://issues.apache.org/jira/browse/KAFKA-17338
> Project: Kafka
>  Issue Type: Task
>  Components: clients, config, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> {{ConsumerConfig}} should be updated to include additional validation in 
> {{postProcessParsedConfig()}}—when {{group.protocol}} is set to {{CONSUMER}}, 
> the value for {{partition.assignment.strategy}} must be either null or empty. 
> Otherwise a {{ConfigException}} should be thrown.
> This is somewhat of the inverse case of KAFKA-15773.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17338) Add validation to ConsumerConfig to prevent using partition assignors with CONSUMER group protocol

2024-08-13 Thread Kirk True (Jira)
Kirk True created KAFKA-17338:
-

 Summary: Add validation to ConsumerConfig to prevent using 
partition assignors with CONSUMER group protocol
 Key: KAFKA-17338
 URL: https://issues.apache.org/jira/browse/KAFKA-17338
 Project: Kafka
  Issue Type: Task
  Components: clients, config, consumer
Reporter: Kirk True
Assignee: Kirk True


{{ConsumerConfig}} should be updated to include additional validation in{{{} 
postProcessParsedConfig(){}}}—when {{group.protocol}} is set to 
{{{}CONSUMER{}}}, the value for {{partition.assignment.strategy}} must be 
either null or empty. Otherwise a {{ConfigException}} should be thrown.

This is somewhat of the inverse case of KAFKA-15773.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17337) Make CONSUMER group protocol default

2024-08-13 Thread Kirk True (Jira)
Kirk True created KAFKA-17337:
-

 Summary: Make CONSUMER group protocol default
 Key: KAFKA-17337
 URL: https://issues.apache.org/jira/browse/KAFKA-17337
 Project: Kafka
  Issue Type: Task
  Components: clients, config, consumer
Affects Versions: 3.9.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 4.0.0


The {{ConsumerConfig}} default value for {{GROUP_PROTOCOL_CONFIG}} is 
{{{}CLASSIC{}}}: 
[source|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L113].
 It should be changed to {{CONSUMER}} for 4.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17335) Lack of default for missing URL encoding option for OAuth causes NPE

2024-08-13 Thread Kirk True (Jira)
Kirk True created KAFKA-17335:
-

 Summary: Lack of default for missing URL encoding option for OAuth 
causes NPE
 Key: KAFKA-17335
 URL: https://issues.apache.org/jira/browse/KAFKA-17335
 Project: Kafka
  Issue Type: Bug
  Components: clients, security
Affects Versions: 3.9.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 4.0.0


KAFKA-16345 added a new client configuration option 
{{{}SASL_OAUTHBEARER_HEADER_URLENCODE{}}}. This is an optional configuration, 
so the user doesn't need to provide it. When an {{{}AdminConfig{}}}, 
{{{}ConsumerConfig{}}}, or {{ProducerConfig}} object is created, it uses the 
default value of {{DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE}} if the value 
isn't present.

However, if the configuration is created as a plain {{Map}} or {{Properties}} 
and the {{sasl.oauthbearer.header.urlencode}} key isn't present, it can lead to 
a {{{}NullPointerException{}}}. This occurs because the code in 
{{AccessTokenRetriever.create()}} assumes that there's always a value present 
in the incoming {{configs}} parameter. But if there isn't an entry for the 
{{sasl.oauthbearer.header.urlencode}} key in the map, a 
{{NullPointerException}} is thrown.

When using map-based configuration, one workaround is to explicitly add an 
entry to the map like so:
{code:java}
Map configs = new HashMap();
. . .
configs.put(SASL_OAUTHBEARER_HEADER_URLENCODE, 
DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE);
. . .
configureSomething(configs);{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16818) Move event processing-related tests from ConsumerNetworkThreadTest to ApplicationEventProcessorTest

2024-08-12 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17873053#comment-17873053
 ] 

Kirk True commented on KAFKA-16818:
---

Yes. Those tests are all basically testing that the 
{{ApplicationEventProcessor}} invokes the correct {{process()}} method given 
the input, so they make more sense in the {{ApplicationEventProcessorTest}} 
class.

> Move event processing-related tests from ConsumerNetworkThreadTest to 
> ApplicationEventProcessorTest
> ---
>
> Key: KAFKA-16818
> URL: https://issues.apache.org/jira/browse/KAFKA-16818
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, unit tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> The {{ConsumerNetworkThreadTest}} currently has a number of tests which do 
> the following:
>  # Add event of type _T_ to the event queue
>  # Call {{ConsumerNetworkThread.runOnce()}} to dequeue the events and call 
> {{ApplicationEventProcessor.process()}}
>  # Verify that the appropriate {{ApplicationEventProcessor}} process method 
> was invoked for the event
> Those types of tests should be moved to {{{}ApplicationEventProcessorTest{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced/failed

2024-08-12 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15588:
-

Assignee: (was: Lianet Magrans)

> Purge the unsent offset commits/fetches when the member is fenced/failed
> 
>
> Key: KAFKA-15588
> URL: https://issues.apache.org/jira/browse/KAFKA-15588
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support, reconciliation
> Fix For: 4.0.0
>
>
> When the member is fenced/failed, we should purge the inflight offset commits 
> and fetches.  HeartbeatRequestManager should be able to handle this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol

2024-08-12 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-15284.
---
Resolution: Won't Fix

This is not needed given the current approach the broker is using.

> Implement ConsumerGroupProtocolVersionResolver to determine consumer group 
> protocol
> ---
>
> Key: KAFKA-15284
> URL: https://issues.apache.org/jira/browse/KAFKA-15284
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> At client initialization, we need to determine which of the 
> {{ConsumerDelegate}} implementations to use:
>  # {{LegacyKafkaConsumerDelegate}}
>  # {{AsyncKafkaConsumerDelegate}}
> There are conditions defined by KIP-848 that determine client eligibility to 
> use the new protocol. This will be modeled by the—deep 
> breath—{{{}ConsumerGroupProtocolVersionResolver{}}}.
> Known tasks:
>  * Determine at what point in the {{Consumer}} initialization the network 
> communication should happen
>  * Determine what RPCs to invoke in order to determine eligibility (API 
> versions, IBP version, etc.)
>  * Implement the network client lifecycle (startup, communication, shutdown, 
> etc.)
>  * Determine the fallback path in case the client is not eligible to use the 
> protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16022) AsyncKafkaConsumer sometimes complains “No current assignment for partition {}”

2024-08-12 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16022:
--
Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor)

> AsyncKafkaConsumer sometimes complains “No current assignment for partition 
> {}”
> ---
>
> Key: KAFKA-16022
> URL: https://issues.apache.org/jira/browse/KAFKA-16022
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>
> This seems to be a timing issue that before the member receives any 
> assignment from the coordinator, the fetcher will try to find the current 
> position causing "No current assignment for partition {}".  This creates a 
> small amount of noise to the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15909) Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer

2024-08-12 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872946#comment-17872946
 ] 

Kirk True commented on KAFKA-15909:
---

[~yangpoan]—feel free to assign it to yourself and have a go. Thanks!

> Throw error when consumer configured with empty/whitespace-only group.id for 
> LegacyKafkaConsumer
> 
>
> Key: KAFKA-15909
> URL: https://issues.apache.org/jira/browse/KAFKA-15909
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
> Fix For: 4.0.0
>
>
> Per 
> [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer],
>  the use of an empty value for {{group.id}} configuration was deprecated back 
> in 2.2.0.
> In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see 
> KAFKA-14438).
> This task is to update the {{LegacyKafkaConsumer}} implementation to throw an 
> error in 4.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-08-12 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872944#comment-17872944
 ] 

Kirk True commented on KAFKA-16792:
---

[~yangpoan]—feel free to assign it to yourself. Thanks!

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testCurrentLag
>  - testFetchStableOffsetThrowInPoll
>  - testListOffsetShouldUpdateSubscriptions
>  - testPollReturnsRecords
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-08-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14567:
--
Labels: eos transactions  (was: eos)

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: producer , streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Blocker
>  Labels: eos, transactions
> Fix For: 4.0.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> or

[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-08-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14567:
--
Component/s: producer 

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: producer , streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Blocker
>  Labels: eos
> Fix For: 4.0.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.proce

  1   2   3   4   5   6   7   8   9   10   >