[jira] [Updated] (KAFKA-16933) New consumer leave group flow may not send leave request and clear epoch

2024-06-11 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16933:
---
Priority: Blocker  (was: Major)

> New consumer leave group flow may not send leave request and clear epoch
> 
>
> Key: KAFKA-16933
> URL: https://issues.apache.org/jira/browse/KAFKA-16933
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When leaving a group (due to close or unsubscribe), the consumer needs to run 
> revocation callbacks. In the case where this callbacks fail (user error or 
> timeout due to long running callback), the consumer should still clear its 
> epoch and send a leave group heartbeat. This is currently the case for 
> unsubscribe but not for close, so should be fixed.
> Also note that clearing the epoch even if the callback fails is needed to 
> ensure that the last epoch is not used anymore in offset commit requests 
> after the member leaves the group (ex. when auto-commit enabled).  
> Also note that this should still be applied (clear epoch) if a member that is 
> running the callbacks to leave get fenced. It does not need to send the leave 
> group request but should take all leaving action to leave the client in a 
> consistent state (avoiding auto-commits that could happen while it leaves, 
> that would use the fenced epoch) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16933) New consumer leave group flow may not send leave request and clear epoch

2024-06-11 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16933:
---
Fix Version/s: 3.8.0

> New consumer leave group flow may not send leave request and clear epoch
> 
>
> Key: KAFKA-16933
> URL: https://issues.apache.org/jira/browse/KAFKA-16933
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When leaving a group (due to close or unsubscribe), the consumer needs to run 
> revocation callbacks. In the case where this callbacks fail (user error or 
> timeout due to long running callback), the consumer should still clear its 
> epoch and send a leave group heartbeat. This is currently the case for 
> unsubscribe but not for close, so should be fixed.
> Also note that clearing the epoch even if the callback fails is needed to 
> ensure that the last epoch is not used anymore in offset commit requests 
> after the member leaves the group (ex. when auto-commit enabled).  
> Also note that this should still be applied (clear epoch) if a member that is 
> running the callbacks to leave get fenced. It does not need to send the leave 
> group request but should take all leaving action to leave the client in a 
> consistent state (avoiding auto-commits that could happen while it leaves, 
> that would use the fenced epoch) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16933) New consumer leave group flow may not send leave request and clear epoch

2024-06-11 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16933:
---
Affects Version/s: 3.7.0

> New consumer leave group flow may not send leave request and clear epoch
> 
>
> Key: KAFKA-16933
> URL: https://issues.apache.org/jira/browse/KAFKA-16933
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When leaving a group (due to close or unsubscribe), the consumer needs to run 
> revocation callbacks. In the case where this callbacks fail (user error or 
> timeout due to long running callback), the consumer should still clear its 
> epoch and send a leave group heartbeat. This is currently the case for 
> unsubscribe but not for close, so should be fixed.
> Also note that clearing the epoch even if the callback fails is needed to 
> ensure that the last epoch is not used anymore in offset commit requests 
> after the member leaves the group (ex. when auto-commit enabled).  
> Also note that this should still be applied (clear epoch) if a member that is 
> running the callbacks to leave get fenced. It does not need to send the leave 
> group request but should take all leaving action to leave the client in a 
> consistent state (avoiding auto-commits that could happen while it leaves, 
> that would use the fenced epoch) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16933) New consumer leave group flow may not send leave request and clear epoch

2024-06-11 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16933:
---
Labels: kip-848-client-support  (was: )

> New consumer leave group flow may not send leave request and clear epoch
> 
>
> Key: KAFKA-16933
> URL: https://issues.apache.org/jira/browse/KAFKA-16933
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> When leaving a group (due to close or unsubscribe), the consumer needs to run 
> revocation callbacks. In the case where this callbacks fail (user error or 
> timeout due to long running callback), the consumer should still clear its 
> epoch and send a leave group heartbeat. This is currently the case for 
> unsubscribe but not for close, so should be fixed.
> Also note that clearing the epoch even if the callback fails is needed to 
> ensure that the last epoch is not used anymore in offset commit requests 
> after the member leaves the group (ex. when auto-commit enabled).  
> Also note that this should still be applied (clear epoch) if a member that is 
> running the callbacks to leave get fenced. It does not need to send the leave 
> group request but should take all leaving action to leave the client in a 
> consistent state (avoiding auto-commits that could happen while it leaves, 
> that would use the fenced epoch) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16933) New consumer leave group flow may not send leave request and clear epoch

2024-06-11 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16933:
--

 Summary: New consumer leave group flow may not send leave request 
and clear epoch
 Key: KAFKA-16933
 URL: https://issues.apache.org/jira/browse/KAFKA-16933
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


When leaving a group (due to close or unsubscribe), the consumer needs to run 
revocation callbacks. In the case where this callbacks fail (user error or 
timeout due to long running callback), the consumer should still clear its 
epoch and send a leave group heartbeat. This is currently the case for 
unsubscribe but not for close, so should be fixed.

Also note that clearing the epoch even if the callback fails is needed to 
ensure that the last epoch is not used anymore in offset commit requests after 
the member leaves the group (ex. when auto-commit enabled).  

Also note that this should still be applied (clear epoch) if a member that is 
running the callbacks to leave get fenced. It does not need to send the leave 
group request but should take all leaving action to leave the client in a 
consistent state (avoiding auto-commits that could happen while it leaves, that 
would use the fenced epoch) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-06-07 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846315#comment-17846315
 ] 

Lianet Magrans edited comment on KAFKA-16758 at 6/7/24 1:28 PM:


Hey, I would be happy to take on this one! I expect I could try to make time 
for it after 3.8. Thanks!


was (Author: JIRAUSER300183):
Hey, I would be happy to take on this one! I expect I could make time for it 
after 3.8. Thanks!

> Extend Consumer#close with option to leave the group or not
> ---
>
> Key: KAFKA-16758
> URL: https://issues.apache.org/jira/browse/KAFKA-16758
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: needs-kip
> Fix For: 4.0.0
>
>
> See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the 
> full context.
> Essentially we would get rid of the "internal.leave.group.on.close" config 
> that is used as a backdoor by Kafka Streams right now to prevent closed 
> consumers from leaving the group, thus reducing unnecessary task movements 
> after a simple bounce. 
> This would be replaced by an actual public API that would allow the caller to 
> opt in or out to the LeaveGroup when close is called. This would be similar 
> to the KafkaStreams#close(CloseOptions) API, and in fact would be how that 
> API will be implemented (since it only works for static groups at the moment 
> as noted in KAFKA-16514 )
> This has several benefits over the current situation:
>  # It allows plain consumer apps to opt-out of leaving the group when closed, 
> which is currently not possible through any public API (only an internal 
> backdoor config)
>  # It enables the caller to dynamically select the appropriate action 
> depending on why the client is being closed – for example, you would not want 
> the consumer to leave the group during a simple restart, but would want it to 
> leave the group when shutting down the app or if scaling down the node. This 
> is not possible today, even with the internal config, since configs are 
> immutable
>  # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
> that the user's choice to leave the group during close will be respected for 
> non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16786) New consumer should not require the deprecated partition.assignment.strategy

2024-06-06 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16786.
--

> New consumer should not require the deprecated partition.assignment.strategy
> 
>
> Key: KAFKA-16786
> URL: https://issues.apache.org/jira/browse/KAFKA-16786
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The partition.assignment.strategy config is deprecated with the new consumer 
> group protocol KIP-848. With the new protocol, server side assignors are 
> supported for now, defined in the property
> group.remote.assignor, and with default values selected by the broker, so 
> it's not even a required property. 
> The new AsyncKafkaConsumer supports the new protocol only, but it currently 
> throws an IllegalStateException if a call to subscribe is made and the 
> deprecated config partition.assignment.strategy is empty (see 
> [throwIfNoAssignorsConfigured|https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1715]).
>  
> We should remove the reference to ConsumerPartitionAssignor in the 
> AsyncKafkaConsumer, along with it's validation for non-empty on subscribe 
> (only use it has)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16786) New consumer should not require the deprecated partition.assignment.strategy

2024-06-05 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16786:
---
Summary: New consumer should not require the deprecated 
partition.assignment.strategy  (was: New consumer subscribe should not require 
the deprecated partition.assignment.strategy)

> New consumer should not require the deprecated partition.assignment.strategy
> 
>
> Key: KAFKA-16786
> URL: https://issues.apache.org/jira/browse/KAFKA-16786
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The partition.assignment.strategy config is deprecated with the new consumer 
> group protocol KIP-848. With the new protocol, server side assignors are 
> supported for now, defined in the property
> group.remote.assignor, and with default values selected by the broker, so 
> it's not even a required property. 
> The new AsyncKafkaConsumer supports the new protocol only, but it currently 
> throws an IllegalStateException if a call to subscribe is made and the 
> deprecated config partition.assignment.strategy is empty (see 
> [throwIfNoAssignorsConfigured|https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1715]).
>  
> We should remove the reference to ConsumerPartitionAssignor in the 
> AsyncKafkaConsumer, along with it's validation for non-empty on subscribe 
> (only use it has)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16823) Extract LegacyConsumer-specific unit tests from generic KafkaConsumerTest

2024-06-05 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852442#comment-17852442
 ] 

Lianet Magrans commented on KAFKA-16823:


Sure, feel free to take it, I don't have bandwidth at the moment. Thanks!

> Extract LegacyConsumer-specific unit tests from generic KafkaConsumerTest 
> --
>
> Key: KAFKA-16823
> URL: https://issues.apache.org/jira/browse/KAFKA-16823
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> Currently the KafkaConsumerTest file contains unit tests that apply to both 
> consumer implementations, but also tests that apply to the legacy consumer 
> only. We should consider splitting the tests that apply to the legacy only 
> into their own LegacyConsumerTest file (aligning with the existing 
> AsyncKafkaConsumerTest). End result would be: 
> KafkaConsumerTest -> unit tests that apply to both consumers. 
> LegacyKafkaConsumerTest -> unit tests that apply only to the 
> LegacyKafkaConsumer, either because of the logic they test, or the way they 
> are written (file to be created with this task)
> AsyncKafkaConsumerTest -> unit tests that apply only to the 
> AsyncKafkaConsumer (this file already exist)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-05-31 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851115#comment-17851115
 ] 

Lianet Magrans commented on KAFKA-15305:


Makes sense, I was concerned about the actual poll just because I briefly 
forgot that the close blocks on the LeaveGroupEvent 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1287.]
 This is the magic bit. It means that before shutting down the network thread 
we ensure the HB manager poll and the request added to the network thread (done 
sequentially as your point 3). All good. 

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16766) New consumer offsetsForTimes timeout exception has wrong message

2024-05-31 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16766.
--

> New consumer offsetsForTimes timeout exception has wrong message
> 
>
> Key: KAFKA-16766
> URL: https://issues.apache.org/jira/browse/KAFKA-16766
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
> will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
> with the following as message: "java.util.concurrent.TimeoutException".
> We should provide a clearer message, and I would even say we keep the same 
> message that the LegacyConsumer shows in this case, ex: "Failed to get 
> offsets by times in 6ms".
> To fix this we should consider catching the timeout exception in the consumer 
> when offsetsForTimes result times out 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
>  and propagate it with the message specific to offsetsForTimes.
> Same situation exists for beginningOffsets and endOffsets. All 3 funcs show 
> the same timeout message in the LegacyConsumer (defined 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
>  but do not have a clear message in the Async, so we should fix them all 3.
> With the fix, we should write tests for each func, like the ones defined for 
> the Legacy Consumer 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
>  Note that we would need different tests, added to AsyncKafkaConsumerTest, 
> given that the AsyncKafkaConsumer issues a FindCoordinator request in this 
> case (on manager poll), but the LegacyKafkaConsumer does not, so it does not 
> account for that when matching requests/responses in the current tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-05-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850779#comment-17850779
 ] 

Lianet Magrans commented on KAFKA-16639:


Hey [~chia7712] , totally agree. The PR in this ticket is ensuring the "create 
request" part: IF the HBManager is polled when the member wants to leave, it 
will create the leave request (no matter the in-flights). But we also need to 
make sure that the manager is actually polled (as you had suggested initially, 
but this PR ended up focusing on the create part as you said). Follow-up 
comments related to that on KAFKA-15305

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-05-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850778#comment-17850778
 ] 

Lianet Magrans commented on KAFKA-15305:


Agree with this missing bit, but I wonder if we need a bit more than just "wait 
for pending requests", because I don't see that we'll ever generate one on the 
close situation. With KAFKA-16639 we ensure that HB request to leave is 
created, but that happens only IF the HB manager is polled when closing the 
consumer. On close, looks to me that may not happen, because 
[here|https://github.com/apache/kafka/blob/32b2b73f673ecd41d17c03e99db3746c517990c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1232]
 is where the close operation triggers the leave event (that will just leave 
the state machine indicating LEAVING, to be used by the HB manager next poll), 
but right after we shutdown the network thread 
[here|https://github.com/apache/kafka/blob/32b2b73f673ecd41d17c03e99db3746c517990c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1237],
 so not polling managers anymore. At this point, even with the inflight 
suggestion above, the networkClient.poll 
[here|https://github.com/apache/kafka/blob/cc269b0d438534ae8fef16b39354da1d78332a2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L299]
 would return empty I expect. 

So seems we also need to make sure that the HB manager is actually polled when 
closing the consumer (the HBManager.pollOnClose you had suggested at some 
point). With that, at 
[this|https://github.com/apache/kafka/blob/cc269b0d438534ae8fef16b39354da1d78332a2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L216-L217]
 point we would actually generate the HB to leave, add then we would hit the 
logic you mentioned above with a request to send (not empty), 
[here|https://github.com/apache/kafka/blob/cc269b0d438534ae8fef16b39354da1d78332a2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L299].
 Makes sense?

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-05-28 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850044#comment-17850044
 ] 

Lianet Magrans edited comment on KAFKA-16639 at 5/28/24 4:57 PM:
-

I reassigned the task to [~frankvicky] who is the one working on this one 
(linked PR). Thanks [~frankvicky]! Follow-up comments on the PR. 


was (Author: JIRAUSER300183):
I reassigned the task to [~frankvicky] who is the one working on this one 
(linked PR). Thanks [~frankvicky]! Follow-up comment on the PR. 

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16816) Remove unneeded FencedInstanceId support on commit path for new consumer

2024-05-28 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16816:
--

Assignee: Lianet Magrans

> Remove unneeded FencedInstanceId support on commit path for new consumer
> 
>
> Key: KAFKA-16816
> URL: https://issues.apache.org/jira/browse/KAFKA-16816
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> The new consumer contains logic related to handling FencedInstanceId 
> exception received as a response to an OffsetCommit request (on the 
> [consumer|https://github.com/apache/kafka/blob/028e7a06dcdca7d4dbeae83f2fce0a4120cc2753/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L776]
>  and [commit 
> manager|https://github.com/apache/kafka/blob/028e7a06dcdca7d4dbeae83f2fce0a4120cc2753/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L715]),
>  but with the new group protocol, we will never get that error on a commit 
> response. We should remove the code that expects the FencedInstanceId on the 
> commit response, and also clean up the other related usages that we added to 
> propagate the FencedInstanceId exception on the poll, commitSync and 
> commitAsync API. Note that throwing that exception is part of the contract of 
> the poll, commitSync and commitAsync APIs of the KafkaConsumer, but it 
> changes with the new protocol. We should update the java doc for the new 
> AsyncKafkaConsumer to reflect the change.  
>  
> With the new protocol If a consumer tries to commit offsets, there could be 2 
> cases:
>  # empty group -> commit succeeds, fencing an instance id would never happen 
> because group is empty
>  # non-empty group -> commit fails with UnknownMemberId, indicating that the 
> member is not known to the group. The consumer needs to join the non-empty 
> group in order to commit offsets to it. To complete the story, the moment the 
> consumer attempts to join, it will receive an UnreleasedInstanceId error on 
> the HB response, indicating it using a groupInstanceId that is already in use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-05-28 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850044#comment-17850044
 ] 

Lianet Magrans commented on KAFKA-16639:


I reassigned the task to [~frankvicky] who is the one working on this one 
(linked PR). Thanks [~frankvicky]! Follow-up comment on the PR. 

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-05-28 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16639:
--

Assignee: TengYao Chi  (was: Philip Nee)

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16418) Review/split long-running admin client integration tests

2024-05-27 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849816#comment-17849816
 ] 

Lianet Magrans edited comment on KAFKA-16418 at 5/27/24 7:29 PM:
-

Reviewed in more detail and the mean exec time for this test has remained 
stable (~16 mins), see graphs 
[here|https://ge.apache.org/scans/tests?search.names=Git%20branch=P90D=kafka=America%2FToronto=trunk=kafka.api.PlaintextAdminIntegrationTest]
 This test is not affected by the parametrization that caused the spike in the 
similar consumer test, so not really a situation to address here. Closing this. 


was (Author: JIRAUSER300183):
Reviewed in more detail and the mean exec time for this test has remained 
stable (~16 mins), see graphs 
[here|[https://ge.apache.org/scans/tests?search.names=Git%20branch=P90D=kafka=America%2FToronto=trunk=kafka.api.PlaintextAdminIntegrationTest].]
 This test is not affected by the parametrization that caused the spike in the 
similar consumer test, so not really a situation to address here. Closing this. 

> Review/split long-running admin client integration tests
> 
>
> Key: KAFKA-16418
> URL: https://issues.apache.org/jira/browse/KAFKA-16418
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> Review PlaintextAdminIntegrationTest and attempt to split it to allow for 
> parallelization and improve build times. This tests is the longest running 
> integration test in kafka.api, so a similar approach to what has been done 
> with the consumer tests in PlaintextConsumerTest should be a good 
> improvement. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16418) Review/split long-running admin client integration tests

2024-05-27 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16418.
--

> Review/split long-running admin client integration tests
> 
>
> Key: KAFKA-16418
> URL: https://issues.apache.org/jira/browse/KAFKA-16418
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> Review PlaintextAdminIntegrationTest and attempt to split it to allow for 
> parallelization and improve build times. This tests is the longest running 
> integration test in kafka.api, so a similar approach to what has been done 
> with the consumer tests in PlaintextConsumerTest should be a good 
> improvement. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16418) Review/split long-running admin client integration tests

2024-05-27 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-16418.

Resolution: Not A Problem

> Review/split long-running admin client integration tests
> 
>
> Key: KAFKA-16418
> URL: https://issues.apache.org/jira/browse/KAFKA-16418
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> Review PlaintextAdminIntegrationTest and attempt to split it to allow for 
> parallelization and improve build times. This tests is the longest running 
> integration test in kafka.api, so a similar approach to what has been done 
> with the consumer tests in PlaintextConsumerTest should be a good 
> improvement. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16418) Review/split long-running admin client integration tests

2024-05-27 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849816#comment-17849816
 ] 

Lianet Magrans commented on KAFKA-16418:


Reviewed in more detail and the mean exec time for this test has remained 
stable (~16 mins), see graphs 
[here|[https://ge.apache.org/scans/tests?search.names=Git%20branch=P90D=kafka=America%2FToronto=trunk=kafka.api.PlaintextAdminIntegrationTest].]
 This test is not affected by the parametrization that caused the spike in the 
similar consumer test, so not really a situation to address here. Closing this. 

> Review/split long-running admin client integration tests
> 
>
> Key: KAFKA-16418
> URL: https://issues.apache.org/jira/browse/KAFKA-16418
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> Review PlaintextAdminIntegrationTest and attempt to split it to allow for 
> parallelization and improve build times. This tests is the longest running 
> integration test in kafka.api, so a similar approach to what has been done 
> with the consumer tests in PlaintextConsumerTest should be a good 
> improvement. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID

2024-05-27 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15283:
---
Fix Version/s: (was: 4.0.0)

> Client support for OffsetFetch and OffsetCommit with topic ID
> -
>
> Key: KAFKA-15283
> URL: https://issues.apache.org/jira/browse/KAFKA-15283
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support, newbie, offset
>
> Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory 
> {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and 
> {{METADATA}} RPC calls.
> With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in 
> the same way, so the new client implementation will provide it when issuing 
> those requests. Topic names should continue to be supported as needed by the 
> {{{}AdminClient{}}}.
> We should also review/clean-up the support for topic names in requests such 
> as the {{METADATA}} request (currently supporting topic names as well as 
> topic IDs on the client side).
> Tasks include:
>  * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will 
> be upgraded on the server to support topic ID
>  * Check topic ID propagation internally in the client based on RPCs 
> including it.
>  * Review existing support for topic name for potential clean if not needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16675) Move rebalance callback test for positions to callbacks test file

2024-05-27 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16675.
--

> Move rebalance callback test for positions to callbacks test file
> -
>
> Key: KAFKA-16675
> URL: https://issues.apache.org/jira/browse/KAFKA-16675
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Integration test 
> testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback was 
> added to the PlaintextConsumerTest.scala in this PR 
> https://github.com/apache/kafka/pull/15856, as there was no specific file for 
> testing callbacks at the moment. Another PR is in-flight, adding the file for 
> callback-related tests, https://github.com/apache/kafka/pull/15408. Once 
> 15408 gets merged, we should move 
> testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback to it.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception has wrong message

2024-05-23 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16766:
---
Summary: New consumer offsetsForTimes timeout exception has wrong message  
(was: New consumer offsetsForTimes timeout exception does not have the proper 
message)

> New consumer offsetsForTimes timeout exception has wrong message
> 
>
> Key: KAFKA-16766
> URL: https://issues.apache.org/jira/browse/KAFKA-16766
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
> will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
> with the following as message: "java.util.concurrent.TimeoutException".
> We should provide a clearer message, and I would even say we keep the same 
> message that the LegacyConsumer shows in this case, ex: "Failed to get 
> offsets by times in 6ms".
> To fix this we should consider catching the timeout exception in the consumer 
> when offsetsForTimes result times out 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
>  and propagate it with the message specific to offsetsForTimes.
> Same situation exists for beginningOffsets and endOffsets. All 3 funcs show 
> the same timeout message in the LegacyConsumer (defined 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
>  but do not have a clear message in the Async, so we should fix them all 3.
> With the fix, we should write tests for each func, like the ones defined for 
> the Legacy Consumer 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
>  Note that we would need different tests, added to AsyncKafkaConsumerTest, 
> given that the AsyncKafkaConsumer issues a FindCoordinator request in this 
> case (on manager poll), but the LegacyKafkaConsumer does not, so it does not 
> account for that when matching requests/responses in the current tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-23 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16766:
--

Assignee: Lianet Magrans

> New consumer offsetsForTimes timeout exception does not have the proper 
> message
> ---
>
> Key: KAFKA-16766
> URL: https://issues.apache.org/jira/browse/KAFKA-16766
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
> will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
> with the following as message: "java.util.concurrent.TimeoutException".
> We should provide a clearer message, and I would even say we keep the same 
> message that the LegacyConsumer shows in this case, ex: "Failed to get 
> offsets by times in 6ms".
> To fix this we should consider catching the timeout exception in the consumer 
> when offsetsForTimes result times out 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
>  and propagate it with the message specific to offsetsForTimes.
> Same situation exists for beginningOffsets and endOffsets. All 3 funcs show 
> the same timeout message in the LegacyConsumer (defined 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
>  but do not have a clear message in the Async, so we should fix them all 3.
> With the fix, we should write tests for each func, like the ones defined for 
> the Legacy Consumer 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
>  Note that we would need different tests, added to AsyncKafkaConsumerTest, 
> given that the AsyncKafkaConsumer issues a FindCoordinator request in this 
> case (on manager poll), but the LegacyKafkaConsumer does not, so it does not 
> account for that when matching requests/responses in the current tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-23 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Description: 
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 - testResetToCommittedOffset
 - testResetUsingAutoResetPolicy
 - testPollReturnsRecords

 

  was:
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 - testResetToCommittedOffset
 - testResetUsingAutoResetPolicy

 


> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  - testPollReturnsRecords
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-23 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Description: 
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 - testResetToCommittedOffset
 - testResetUsingAutoResetPolicy

 

  was:
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 - testResetToCommittedOffset

 - 
testResetUsingAutoResetPolicy

 


> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-23 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Description: 
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 - testResetToCommittedOffset

 - 
testResetUsingAutoResetPolicy

 

  was:
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 - 
testResetToCommittedOffset
 


> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - testResetToCommittedOffset
>  - 
> testResetUsingAutoResetPolicy
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-23 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16777:
---
Description: 
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
called continuously, the consumer is not able to 
initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown.

 

There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
but filing this one to provide more context and point out the test failures and 
suggested new tests,. All fail even with the current patch in KAFKA-16637 so 
needs investigation. 

  was:
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too and 
KafkaConsumerTest.testResetToCommittedOffset (also failing due to fetch 
committed offsets never completing when polling with ZERO)

The issue seems to be around calling poll with ZERO timeout, that even when 
called continuously, the consumer is not able to 
initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to 

[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-23 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Description: 
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 - 
testResetToCommittedOffset
 

  was:
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 


> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - 
> testResetToCommittedOffset
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-23 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16777:
---
Description: 
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too and 
KafkaConsumerTest.testResetToCommittedOffset (also failing due to fetch 
committed offsets never completing when polling with ZERO)

The issue seems to be around calling poll with ZERO timeout, that even when 
called continuously, the consumer is not able to 
initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown.

 

There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
but filing this one to provide more context and point out the test failures and 
suggested new tests,. All fail even with the current patch in KAFKA-16637 so 
needs investigation. 

  was:
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
called continuously, the consumer is not able to 
initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to 

[jira] [Assigned] (KAFKA-16815) Handle FencedInstanceId on heartbeat for new consumer

2024-05-22 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16815:
--

Assignee: Lianet Magrans

> Handle FencedInstanceId on heartbeat for new consumer
> -
>
> Key: KAFKA-16815
> URL: https://issues.apache.org/jira/browse/KAFKA-16815
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> With the new consumer group protocol, a member could receive a 
> FencedInstanceIdError in the heartbeat response. This could be the case when 
> an active member using a group instance id is removed from the group by an 
> admin client. If a second member joins with the same instance id, the first 
> member will receive a FencedInstanceId on the next heartbeat response. This 
> should be treated as a fatal error (consumer should not attempt to rejoin). 
> Currently, the FencedInstanceId is not explicitly handled by the client in 
> the HeartbeatRequestManager. It ends up being treated as a fatal error, see 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L417]
>  (just because it lands on the "unexpected" error category). We should handle 
> it explicitly, just to make sure that we express that it's is an expected 
> error: log a proper message for it and fail (handleFatalFailure). We should 
> also that the error is included in the tests that cover the HB request error 
> handling 
> ([here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L798])
>     



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16823) Extract LegacyConsumer-specific unit tests from generic KafkaConsumerTest

2024-05-22 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16823:
--

 Summary: Extract LegacyConsumer-specific unit tests from generic 
KafkaConsumerTest 
 Key: KAFKA-16823
 URL: https://issues.apache.org/jira/browse/KAFKA-16823
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


Currently the KafkaConsumerTest file contains unit tests that apply to both 
consumer implementations, but also tests that apply to the legacy consumer 
only. We should consider splitting the tests that apply to the legacy only into 
their own LegacyConsumerTest file (aligning with the existing 
AsyncKafkaConsumerTest). End result would be: 

KafkaConsumerTest -> unit tests that apply to both consumers. 

LegacyKafkaConsumerTest -> unit tests that apply only to the 
LegacyKafkaConsumer, either because of the logic they test, or the way they are 
written (file to be created with this task)

AsyncKafkaConsumerTest -> unit tests that apply only to the AsyncKafkaConsumer 
(this file already exist)

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-22 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16766:
---
Description: 
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException".

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the 
same timeout message in the LegacyConsumer (defined 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
 but do not have a clear message in the Async, so we should fix them all 3.

With the fix, we should write tests for each func, like the ones defined for 
the Legacy Consumer 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
 Note that we would need different tests, added to AsyncKafkaConsumerTest, 
given that the AsyncKafkaConsumer issues a FindCoordinator request in this case 
(on manager poll), but the LegacyKafkaConsumer does not, so it does not account 
for that when matching requests/responses in the current tests.

  was:
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the 
same timeout message in the LegacyConsumer (defined 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
 but do not have a clear message in the Async, so we should fix them all 3.

With the fix, we should write tests for each func, like the ones defined for 
the Legacy Consumer 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
 Note that we would need different tests, added to AsyncKafkaConsumerTest, 
given that the async consumer issues a FindCoordinator request in this case, 
but the AsyncConsumer does, so it does not account for that when matching 
requests/responses in the current tests.


> New consumer offsetsForTimes timeout exception does not have the proper 
> message
> ---
>
> Key: KAFKA-16766
> URL: https://issues.apache.org/jira/browse/KAFKA-16766
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
> will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
> with the following as message: "java.util.concurrent.TimeoutException".
> We should provide a clearer message, and I would even say we keep the same 
> message that the LegacyConsumer shows in this case, ex: "Failed to get 
> offsets by times in 6ms".
> To fix this we should consider catching the timeout exception in the consumer 
> when offsetsForTimes result times out 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
>  and propagate it with the message 

[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-22 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Component/s: clients

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-22 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Priority: Blocker  (was: Major)

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-22 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16777:
---
Priority: Blocker  (was: Major)

> New consumer should throw NoOffsetForPartitionException on continuous poll 
> zero if no reset strategy
> 
>
> Key: KAFKA-16777
> URL: https://issues.apache.org/jira/browse/KAFKA-16777
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If the consumer does not define an offset reset strategy, a call to poll 
> should fail with NoOffsetForPartitionException. That works as expected on the 
> new consumer when polling with a timeout > 0 (existing integration test 
> [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
>  but fails when polling continuously with ZERO timeout.
> This can be easily reproduced with a new integration test like this (passes 
> for the legacy consumer but fails for the new consumer). We should add it as 
> part of the fix, for better coverage:
> {code:java}
>   @ParameterizedTest(name = 
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
>   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
>   def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
> groupProtocol: String): Unit = {
> this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
> "none")
> val consumer = createConsumer(configOverrides = this.consumerConfig)
> consumer.assign(List(tp).asJava)
> // continuous poll should eventually fail because there is no offset 
> reset strategy set (fail only when resetting positions after coordinator is 
> known)
> TestUtils.tryUntilNoAssertionError() {
>   assertThrows(classOf[NoOffsetForPartitionException], () => 
> consumer.poll(Duration.ZERO))
> }
>   }
> {code}
> Also this is covered in the unit test 
> [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
>  that is currently enabled only for the LegacyConsumer. After fixing this 
> issue we should be able to enable it for the new consumer too.
> The issue seems to be around calling poll with ZERO timeout, that even when 
> called continuously, the consumer is not able to 
> initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it 
> to 
> [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
>  where the exception is thrown.
>  
> There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
> but filing this one to provide more context and point out the test failures 
> and suggested new tests,. All fail even with the current patch in KAFKA-16637 
> so needs investigation. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-22 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Fix Version/s: 3.8.0

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16816) Remove unneeded FencedInstanceId support on commit path for new consumer

2024-05-22 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16816:
--

 Summary: Remove unneeded FencedInstanceId support on commit path 
for new consumer
 Key: KAFKA-16816
 URL: https://issues.apache.org/jira/browse/KAFKA-16816
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


The new consumer contains logic related to handling FencedInstanceId exception 
received as a response to an OffsetCommit request (on the 
[consumer|https://github.com/apache/kafka/blob/028e7a06dcdca7d4dbeae83f2fce0a4120cc2753/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L776]
 and [commit 
manager|https://github.com/apache/kafka/blob/028e7a06dcdca7d4dbeae83f2fce0a4120cc2753/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L715]),
 but with the new group protocol, we will never get that error on a commit 
response. We should remove the code that expects the FencedInstanceId on the 
commit response, and also clean up the other related usages that we added to 
propagate the FencedInstanceId exception on the poll, commitSync and 
commitAsync API. Note that throwing that exception is part of the contract of 
the poll, commitSync and commitAsync APIs of the KafkaConsumer, but it changes 
with the new protocol. We should update the java doc for the new 
AsyncKafkaConsumer to reflect the change.  

 

With the new protocol If a consumer tries to commit offsets, there could be 2 
cases:
 # empty group -> commit succeeds, fencing an instance id would never happen 
because group is empty
 # non-empty group -> commit fails with UnknownMemberId, indicating that the 
member is not known to the group. The consumer needs to join the non-empty 
group in order to commit offsets to it. To complete the story, the moment the 
consumer attempts to join, it will receive an UnreleasedInstanceId error on the 
HB response, indicating it using a groupInstanceId that is already in use.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16815) Handle FencedInstanceId on heartbeat for new consumer

2024-05-22 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16815:
--

 Summary: Handle FencedInstanceId on heartbeat for new consumer
 Key: KAFKA-16815
 URL: https://issues.apache.org/jira/browse/KAFKA-16815
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


With the new consumer group protocol, a member could receive a 
FencedInstanceIdError in the heartbeat response. This could be the case when an 
active member using a group instance id is removed from the group by an admin 
client. If a second member joins with the same instance id, the first member 
will receive a FencedInstanceId on the next heartbeat response. This should be 
treated as a fatal error (consumer should not attempt to rejoin). 

Currently, the FencedInstanceId is not explicitly handled by the client in the 
HeartbeatRequestManager. It ends up being treated as a fatal error, see 
[here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L417]
 (just because it lands on the "unexpected" error category). We should handle 
it explicitly, just to make sure that we express that it's is an expected 
error: log a proper message for it and fail (handleFatalFailure). We should 
also that the error is included in the tests that cover the HB request error 
handling 
([here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L798])

    



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14517) Implement regex subscriptions

2024-05-21 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848348#comment-17848348
 ] 

Lianet Magrans commented on KAFKA-14517:


Sure, I'll assign it to me then. Thanks [~phuctran] !

> Implement regex subscriptions
> -
>
> Key: KAFKA-14517
> URL: https://issues.apache.org/jira/browse/KAFKA-14517
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14517) Implement regex subscriptions

2024-05-21 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-14517:
--

Assignee: Lianet Magrans  (was: Phuc Hong Tran)

> Implement regex subscriptions
> -
>
> Key: KAFKA-14517
> URL: https://issues.apache.org/jira/browse/KAFKA-14517
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Summary: Enable consumer unit tests that fail to fetch offsets only for new 
consumer with poll(0)  (was: Enable related unit tests that fail to fetch 
offsets only for new consumer with to poll(0))

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16764:
--

Assignee: appchemist

> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: appchemist
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-17 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847427#comment-17847427
 ] 

Lianet Magrans commented on KAFKA-16764:


Sure! Thanks for helping out! Follow-up on the PR. 

> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16792) Enable related unit tests that fail only for new consumer with to poll(0)

2024-05-17 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16792:
--

 Summary: Enable related unit tests that fail only for new consumer 
with to poll(0)
 Key: KAFKA-16792
 URL: https://issues.apache.org/jira/browse/KAFKA-16792
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Lianet Magrans


Enable the following unit tests for the new async consumer in KafkaConsumerTest:
- testFetchStableOffsetThrowInPoll

- testCurrentLag
- testListOffsetShouldUpdateSubscriptions
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16792) Enable related unit tests that fail to fetch offsets only for new consumer with to poll(0)

2024-05-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Summary: Enable related unit tests that fail to fetch offsets only for new 
consumer with to poll(0)  (was: Enable related unit tests that fail only for 
new consumer with to poll(0))

> Enable related unit tests that fail to fetch offsets only for new consumer 
> with to poll(0)
> --
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
> - testFetchStableOffsetThrowInPoll
> - testCurrentLag
> - testListOffsetShouldUpdateSubscriptions
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16792) Enable related unit tests that fail to fetch offsets only for new consumer with to poll(0)

2024-05-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16792:
---
Description: 
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
 - testFetchStableOffsetThrowInPoll
 - testCurrentLag
 - testListOffsetShouldUpdateSubscriptions
 

  was:
Enable the following unit tests for the new async consumer in KafkaConsumerTest:
- testFetchStableOffsetThrowInPoll

- testCurrentLag
- testListOffsetShouldUpdateSubscriptions
 


> Enable related unit tests that fail to fetch offsets only for new consumer 
> with to poll(0)
> --
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14517) Implement regex subscriptions

2024-05-17 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847369#comment-17847369
 ] 

Lianet Magrans commented on KAFKA-14517:


hey [~phuctran], [~dajac] , I would like to take on this one if that's ok? 
Thanks!

> Implement regex subscriptions
> -
>
> Key: KAFKA-14517
> URL: https://issues.apache.org/jira/browse/KAFKA-14517
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-05-17 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16758:
--

Assignee: Lianet Magrans

> Extend Consumer#close with option to leave the group or not
> ---
>
> Key: KAFKA-16758
> URL: https://issues.apache.org/jira/browse/KAFKA-16758
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: needs-kip
>
> See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the 
> full context.
> Essentially we would get rid of the "internal.leave.group.on.close" config 
> that is used as a backdoor by Kafka Streams right now to prevent closed 
> consumers from leaving the group, thus reducing unnecessary task movements 
> after a simple bounce. 
> This would be replaced by an actual public API that would allow the caller to 
> opt in or out to the LeaveGroup when close is called. This would be similar 
> to the KafkaStreams#close(CloseOptions) API, and in fact would be how that 
> API will be implemented (since it only works for static groups at the moment 
> as noted in KAFKA-16514 )
> This has several benefits over the current situation:
>  # It allows plain consumer apps to opt-out of leaving the group when closed, 
> which is currently not possible through any public API (only an internal 
> backdoor config)
>  # It enables the caller to dynamically select the appropriate action 
> depending on why the client is being closed – for example, you would not want 
> the consumer to leave the group during a simple restart, but would want it to 
> leave the group when shutting down the app or if scaling down the node. This 
> is not possible today, even with the internal config, since configs are 
> immutable
>  # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
> that the user's choice to leave the group during close will be respected for 
> non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16786) New consumer subscribe should not require the deprecated partition.assignment.strategy

2024-05-16 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16786:
---
Fix Version/s: 3.8

> New consumer subscribe should not require the deprecated 
> partition.assignment.strategy
> --
>
> Key: KAFKA-16786
> URL: https://issues.apache.org/jira/browse/KAFKA-16786
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8
>
>
> The partition.assignment.strategy config is deprecated with the new consumer 
> group protocol KIP-848. With the new protocol, server side assignors are 
> supported for now, defined in the property
> group.remote.assignor, and with default values selected by the broker, so 
> it's not even a required property. 
> The new AsyncKafkaConsumer supports the new protocol only, but it currently 
> throws an IllegalStateException if a call to subscribe is made and the 
> deprecated config partition.assignment.strategy is empty (see 
> [throwIfNoAssignorsConfigured|https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1715]).
>  
> We should remove the reference to ConsumerPartitionAssignor in the 
> AsyncKafkaConsumer, along with it's validation for non-empty on subscribe 
> (only use it has)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16786) New consumer subscribe should not require the deprecated partition.assignment.strategy

2024-05-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846985#comment-17846985
 ] 

Lianet Magrans commented on KAFKA-16786:


Hey [~phuctran] , I noticed this issue while reviewing one of your PRs, where 
you had to provide the deprecated assignor to the new consumer. You suffered 
from this bug :) so if you have bandwidth I think it would be a good simple one 
for you to take. 

> New consumer subscribe should not require the deprecated 
> partition.assignment.strategy
> --
>
> Key: KAFKA-16786
> URL: https://issues.apache.org/jira/browse/KAFKA-16786
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> The partition.assignment.strategy config is deprecated with the new consumer 
> group protocol KIP-848. With the new protocol, server side assignors are 
> supported for now, defined in the property
> group.remote.assignor, and with default values selected by the broker, so 
> it's not even a required property. 
> The new AsyncKafkaConsumer supports the new protocol only, but it currently 
> throws an IllegalStateException if a call to subscribe is made and the 
> deprecated config partition.assignment.strategy is empty (see 
> [throwIfNoAssignorsConfigured|https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1715]).
>  
> We should remove the reference to ConsumerPartitionAssignor in the 
> AsyncKafkaConsumer, along with it's validation for non-empty on subscribe 
> (only use it has)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16786) New consumer subscribe should not require the deprecated partition.assignment.strategy

2024-05-16 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16786:
--

 Summary: New consumer subscribe should not require the deprecated 
partition.assignment.strategy
 Key: KAFKA-16786
 URL: https://issues.apache.org/jira/browse/KAFKA-16786
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans


The partition.assignment.strategy config is deprecated with the new consumer 
group protocol KIP-848. With the new protocol, server side assignors are 
supported for now, defined in the property
group.remote.assignor, and with default values selected by the broker, so it's 
not even a required property. 

The new AsyncKafkaConsumer supports the new protocol only, but it currently 
throws an IllegalStateException if a call to subscribe is made and the 
deprecated config partition.assignment.strategy is empty (see 
[throwIfNoAssignorsConfigured|https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1715]).
 
We should remove the reference to ConsumerPartitionAssignor in the 
AsyncKafkaConsumer, along with it's validation for non-empty on subscribe (only 
use it has)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-15 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16777:
---
Description: 
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
called continuously, the consumer is not able to 
initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown.

 

There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
but filing this one to provide more context and point out the test failures and 
suggested new tests,. All fail even with the current patch in KAFKA-16637 so 
needs investigation. 

  was:
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the 

[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-15 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16777:
---
Description: 
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown.

 

There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
but filing this one to provide more context and point out the test failures and 
suggested new tests,. All fail even with the current patch in KAFKA-16637 so 
needs investigation. 

  was:
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout. 

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:

{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}

Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too. 

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the  updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception 

[jira] [Created] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-15 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16777:
--

 Summary: New consumer should throw NoOffsetForPartitionException 
on continuous poll zero if no reset strategy
 Key: KAFKA-16777
 URL: https://issues.apache.org/jira/browse/KAFKA-16777
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Lianet Magrans


If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout. 

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:

{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}

Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too. 

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the  updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown. 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16637) AsyncKafkaConsumer removes offset fetch responses from cache too aggressively

2024-05-14 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846446#comment-17846446
 ] 

Lianet Magrans commented on KAFKA-16637:


Hey [~chickenchickenlove], sorry I had missed your last question. The new group 
rebalance protocol from KIP-848 is supported in KRaft mode only.

> AsyncKafkaConsumer removes offset fetch responses from cache too aggressively
> -
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16766:
---
Description: 
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the 
same timeout message in the LegacyConsumer (defined 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
 but do not have a clear message in the Async, so we should fix them all 3.

With the fix, we should write tests for each func, like the ones defined for 
the Legacy Consumer 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
 Note that we would need different tests, added to AsyncKafkaConsumerTest, 
given that the async consumer issues a FindCoordinator request in this case, 
but the AsyncConsumer does, so it does not account for that when matching 
requests/responses in the current tests.

  was:
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

After the fix, we should be able to write a test like the 
[testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246]
 that exist for the legacy consumer. Note that we would need a different test 
given that the legacy consumer does not issue a FindCoordinator request in this 
case but the AsyncConsumer does, so the test would have to account for that 
when matching requests/responses.


> New consumer offsetsForTimes timeout exception does not have the proper 
> message
> ---
>
> Key: KAFKA-16766
> URL: https://issues.apache.org/jira/browse/KAFKA-16766
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
> will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
> with the following as message: "java.util.concurrent.TimeoutException". 
> We should provide a clearer message, and I would even say we keep the same 
> message that the LegacyConsumer shows in this case, ex: "Failed to get 
> offsets by times in 6ms".
> To fix this we should consider catching the timeout exception in the consumer 
> when offsetsForTimes result times out 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
>  and propagate it with the message specific to offsetsForTimes.
> Same situation exists for beginningOffsets and endOffsets. All 3 funcs show 
> the same timeout message in the LegacyConsumer (defined 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
>  but do not have a clear message in the Async, so we should fix them all 3.
> With the fix, we should 

[jira] [Created] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16766:
--

 Summary: New consumer offsetsForTimes timeout exception does not 
have the proper message
 Key: KAFKA-16766
 URL: https://issues.apache.org/jira/browse/KAFKA-16766
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
 Fix For: 3.8.0


If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

After the fix, we should be able to write a test like the 
[testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246]
 that exist for the legacy consumer. Note that we would need a different test 
given that the legacy consumer does not issue a FindCoordinator request in this 
case but the AsyncConsumer does, so the test would have to account for that 
when matching requests/responses.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16764:
---
Description: 
A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])

This is probably what makes that 
[testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
 fails for the new consumer. Once this bug is fixed, we should be able to 
enable that test for the new consumer.

  was:
A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])


> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16764:
--

 Summary: New consumer should throw InvalidTopicException on poll 
when invalid topic in metadata
 Key: KAFKA-16764
 URL: https://issues.apache.org/jira/browse/KAFKA-16764
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans


A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16406) Split long-running consumer integration test

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16406:
---
Labels: consumer kip-848-client-support  (was: )

> Split long-running consumer integration test
> 
>
> Key: KAFKA-16406
> URL: https://issues.apache.org/jira/browse/KAFKA-16406
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer, kip-848-client-support
> Fix For: 3.8.0
>
>
> PlaintextConsumerTest contains integration tests for the consumer. Since the 
> introduction of the new consumer group protocol (KIP-848) and the new 
> KafkaConsumer, this test has been parametrized to run with multiple 
> combinations, making sure we test the logic for the old and new coordinator, 
> as well as for the legacy and new KafkaConsumer. 
> This led to this being one of the longest-running integration tests, so in 
> the aim of reducing the impact on the build times we could split it to allow 
> for parallelization.  The tests covers multiple areas of the consumer logic, 
> in a single file, so splitting based on the high-level features being tested 
> would be sensible and achieve the result wanted.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16406) Split long-running consumer integration test

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16406.
--

> Split long-running consumer integration test
> 
>
> Key: KAFKA-16406
> URL: https://issues.apache.org/jira/browse/KAFKA-16406
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
> Fix For: 3.8.0
>
>
> PlaintextConsumerTest contains integration tests for the consumer. Since the 
> introduction of the new consumer group protocol (KIP-848) and the new 
> KafkaConsumer, this test has been parametrized to run with multiple 
> combinations, making sure we test the logic for the old and new coordinator, 
> as well as for the legacy and new KafkaConsumer. 
> This led to this being one of the longest-running integration tests, so in 
> the aim of reducing the impact on the build times we could split it to allow 
> for parallelization.  The tests covers multiple areas of the consumer logic, 
> in a single file, so splitting based on the high-level features being tested 
> would be sensible and achieve the result wanted.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16406) Split long-running consumer integration test

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-16406.

Resolution: Fixed

> Split long-running consumer integration test
> 
>
> Key: KAFKA-16406
> URL: https://issues.apache.org/jira/browse/KAFKA-16406
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
> Fix For: 3.8.0
>
>
> PlaintextConsumerTest contains integration tests for the consumer. Since the 
> introduction of the new consumer group protocol (KIP-848) and the new 
> KafkaConsumer, this test has been parametrized to run with multiple 
> combinations, making sure we test the logic for the old and new coordinator, 
> as well as for the legacy and new KafkaConsumer. 
> This led to this being one of the longest-running integration tests, so in 
> the aim of reducing the impact on the build times we could split it to allow 
> for parallelization.  The tests covers multiple areas of the consumer logic, 
> in a single file, so splitting based on the high-level features being tested 
> would be sensible and achieve the result wanted.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16758) Extend Consumer#close with option to leave the group or not

2024-05-14 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846315#comment-17846315
 ] 

Lianet Magrans commented on KAFKA-16758:


Hey, I would be happy to take on this one! I expect I could make time for it 
after 3.8. Thanks!

> Extend Consumer#close with option to leave the group or not
> ---
>
> Key: KAFKA-16758
> URL: https://issues.apache.org/jira/browse/KAFKA-16758
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the 
> full context.
> Essentially we would get rid of the "internal.leave.group.on.close" config 
> that is used as a backdoor by Kafka Streams right now to prevent closed 
> consumers from leaving the group, thus reducing unnecessary task movements 
> after a simple bounce. 
> This would be replaced by an actual public API that would allow the caller to 
> opt in or out to the LeaveGroup when close is called. This would be similar 
> to the KafkaStreams#close(CloseOptions) API, and in fact would be how that 
> API will be implemented (since it only works for static groups at the moment 
> as noted in KAFKA-16514 )
> This has several benefits over the current situation:
>  # It allows plain consumer apps to opt-out of leaving the group when closed, 
> which is currently not possible through any public API (only an internal 
> backdoor config)
>  # It enables the caller to dynamically select the appropriate action 
> depending on why the client is being closed – for example, you would not want 
> the consumer to leave the group during a simple restart, but would want it to 
> leave the group when shutting down the app or if scaling down the node. This 
> is not possible today, even with the internal config, since configs are 
> immutable
>  # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so 
> that the user's choice to leave the group during close will be respected for 
> non-static members



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer

2024-05-13 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16737:
--

Assignee: Lianet Magrans

> Clean up KafkaConsumerTest TODOs enabling tests for new consumer
> 
>
> Key: KAFKA-16737
> URL: https://issues.apache.org/jira/browse/KAFKA-16737
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are 
> only enabled for the CLASSIC protocol and should be reviewed and enabled for 
> the new CONSUMER group protocol when applicable. Some tests also have TODOs 
> to enable them for the new consumer when certain features/bugs are addressed. 
> The new protocol and consumer implementation have evolved a lot since those 
> TODOs where added, so we should review them all, enable tests for the new 
> protocol when applicable and removing the TODOs from the code. Note that 
> there is another AsyncKafkaConsumerTest.java, testing logic specific to the 
> internals of the new consumer, but still many tests in the KafkaConsumerTest 
> apply to both the new and legacy consumer, and we should enable them for 
> both. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer

2024-05-13 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16737:
---
Affects Version/s: 3.7.0

> Clean up KafkaConsumerTest TODOs enabling tests for new consumer
> 
>
> Key: KAFKA-16737
> URL: https://issues.apache.org/jira/browse/KAFKA-16737
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
>
> KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are 
> only enabled for the CLASSIC protocol and should be reviewed and enabled for 
> the new CONSUMER group protocol when applicable. Some tests also have TODOs 
> to enable them for the new consumer when certain features/bugs are addressed. 
> The new protocol and consumer implementation have evolved a lot since those 
> TODOs where added, so we should review them all, enable tests for the new 
> protocol when applicable and removing the TODOs from the code. Note that 
> there is another AsyncKafkaConsumerTest.java, testing logic specific to the 
> internals of the new consumer, but still many tests in the KafkaConsumerTest 
> apply to both the new and legacy consumer, and we should enable them for 
> both. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer

2024-05-13 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16737:
---
Fix Version/s: 3.8.0

> Clean up KafkaConsumerTest TODOs enabling tests for new consumer
> 
>
> Key: KAFKA-16737
> URL: https://issues.apache.org/jira/browse/KAFKA-16737
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are 
> only enabled for the CLASSIC protocol and should be reviewed and enabled for 
> the new CONSUMER group protocol when applicable. Some tests also have TODOs 
> to enable them for the new consumer when certain features/bugs are addressed. 
> The new protocol and consumer implementation have evolved a lot since those 
> TODOs where added, so we should review them all, enable tests for the new 
> protocol when applicable and removing the TODOs from the code. Note that 
> there is another AsyncKafkaConsumerTest.java, testing logic specific to the 
> internals of the new consumer, but still many tests in the KafkaConsumerTest 
> apply to both the new and legacy consumer, and we should enable them for 
> both. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer

2024-05-13 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16737:
---
Priority: Blocker  (was: Major)

> Clean up KafkaConsumerTest TODOs enabling tests for new consumer
> 
>
> Key: KAFKA-16737
> URL: https://issues.apache.org/jira/browse/KAFKA-16737
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
>
> KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are 
> only enabled for the CLASSIC protocol and should be reviewed and enabled for 
> the new CONSUMER group protocol when applicable. Some tests also have TODOs 
> to enable them for the new consumer when certain features/bugs are addressed. 
> The new protocol and consumer implementation have evolved a lot since those 
> TODOs where added, so we should review them all, enable tests for the new 
> protocol when applicable and removing the TODOs from the code. Note that 
> there is another AsyncKafkaConsumerTest.java, testing logic specific to the 
> internals of the new consumer, but still many tests in the KafkaConsumerTest 
> apply to both the new and legacy consumer, and we should enable them for 
> both. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer

2024-05-13 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16737:
--

 Summary: Clean up KafkaConsumerTest TODOs enabling tests for new 
consumer
 Key: KAFKA-16737
 URL: https://issues.apache.org/jira/browse/KAFKA-16737
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Lianet Magrans


KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are 
only enabled for the CLASSIC protocol and should be reviewed and enabled for 
the new CONSUMER group protocol when applicable. Some tests also have TODOs to 
enable them for the new consumer when certain features/bugs are addressed. 

The new protocol and consumer implementation have evolved a lot since those 
TODOs where added, so we should review them all, enable tests for the new 
protocol when applicable and removing the TODOs from the code. Note that there 
is another AsyncKafkaConsumerTest.java, testing logic specific to the internals 
of the new consumer, but still many tests in the KafkaConsumerTest apply to 
both the new and legacy consumer, and we should enable them for both. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time

2024-05-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16695:
---
Fix Version/s: 3.8.0

> Improve expired poll interval logging by showing exceeded time
> --
>
> Key: KAFKA-16695
> URL: https://issues.apache.org/jira/browse/KAFKA-16695
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When a consumer poll iteration takes longer than the max.poll.interval, the 
> consumer logs a warn suggesting that the max.poll.interval config was 
> exceeded, and pro-actively leaves the group. The log suggests to consider 
> adjusting the max.poll.interval.config which should help in the cases of long 
> processing times. We should consider adding the info of how much time the 
> interval was exceeded, since it could be helpful in guiding the user to 
> effectively adjust the config. This is done in other clients, that log this 
> kind of messages in this situation:
> {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust 
> max.poll.interval.ms for long-running message processing): leaving 
> group{quote}
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time

2024-05-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16695:
---
Labels: kip-848-client-support  (was: )

> Improve expired poll interval logging by showing exceeded time
> --
>
> Key: KAFKA-16695
> URL: https://issues.apache.org/jira/browse/KAFKA-16695
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> When a consumer poll iteration takes longer than the max.poll.interval, the 
> consumer logs a warn suggesting that the max.poll.interval config was 
> exceeded, and pro-actively leaves the group. The log suggests to consider 
> adjusting the max.poll.interval.config which should help in the cases of long 
> processing times. We should consider adding the info of how much time the 
> interval was exceeded, since it could be helpful in guiding the user to 
> effectively adjust the config. This is done in other clients, that log this 
> kind of messages in this situation:
> {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust 
> max.poll.interval.ms for long-running message processing): leaving 
> group{quote}
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time

2024-05-09 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16695:
--

 Summary: Improve expired poll interval logging by showing exceeded 
time
 Key: KAFKA-16695
 URL: https://issues.apache.org/jira/browse/KAFKA-16695
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


When a consumer poll iteration takes longer than the max.poll.interval, the 
consumer logs a warn suggesting that the max.poll.interval config was exceeded, 
and pro-actively leaves the group. The log suggests to consider adjusting the 
max.poll.interval.config which should help in the cases of long processing 
times. We should consider adding the info of how much time the interval was 
exceeded, since it could be helpful in guiding the user to effectively adjust 
the config. This is done in other clients, that log this kind of messages in 
this situation:
{quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust 
max.poll.interval.ms for long-running message processing): leaving group{quote}
  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer

2024-05-08 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16665.
--

> Fail to get partition's position from within onPartitionsAssigned callback in 
> new consumer 
> ---
>
> Key: KAFKA-16665
> URL: https://issues.apache.org/jira/browse/KAFKA-16665
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If we attempt to call consumer.position(tp) from within the 
> onPartitionsAssigned callback, the new consumer fails with a 
> TimeoutException. The expectation is that we should be able to retrieve the 
> position of newly assigned partitions, as it happens with the legacy 
> consumer, that allows this call. This is actually used from places within 
> Kafka itself (ex. Connect 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
>  
> The failure in the new consumer is because the partitions that are assigned 
> but awaiting the onPartitionsAssigned callback, are excluded from the list of 
> partitions that should initialize. We should allow the partitions to 
> initialize their positions, without allowing to fetch data from them (which 
> is already achieve based on the isFetchable flag in the subscription state).
> Note that a partition position can be updated from 2 places: call to 
> consumer.position or call to consumer.poll. Both will attempt to 
> `updateFetchPositions` if there is no valid position yet, but even after 
> having a valid position after those calls, the partition will remain 
> non-fetchable until the onPartitionsAssigned callback completes (fetchable 
> considers that the partitions has a valid position AND is not awaiting the 
> callback)
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843887#comment-17843887
 ] 

Lianet Magrans edited comment on KAFKA-16670 at 5/6/24 9:01 PM:


Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe (HeartbeatRequest) until it gets a response to the initial 
FindCoordinator request (HeartbeatManager skips sending requests if it does not 
know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's doomed to return empty, because it is not waiting for 
records from the topics you're interested in (no partitions assigned yet). 
Could you make sure that the test is calling poll after the assignment has been 
received? (I would suggest just polling while true for a certain amount of 
time, no sleeping after the subscribe needed). 

This integration test for the consumer 
[testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153]
 has a very similar logic to the one you're trying to achieve (create consumer, 
subscribe right away and consume), and since a new broker and consumer are 
setup for each test, the test will go down the same path of having to find a 
coordinator before sending the HeartbeatRequest with a subscription. The main 
difference from looking at both seems to be the limited number of polls in your 
failed test scenario, so let's try to rule that out to better isolate the 
situation. Hope it helps! Let me know


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe until it gets a response to the initial FindCoordinator request 
(HeartbeatManager skips sending requests if it does not know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's 

[jira] [Commented] (KAFKA-16670) KIP-848 : Consumer will not receive assignment forever because of concurrent issue.

2024-05-06 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843887#comment-17843887
 ] 

Lianet Magrans commented on KAFKA-16670:


Hey [~chickenchickenlove], thanks for trying out this! Some clarification in 
case it helps. In the flow you described, the new consumer will send a request 
to find the group coordinator (FindCoordinator) when it gets created, but even 
if there's a call to consumer.subscribe right after, it won't send a request to 
subscribe until it gets a response to the initial FindCoordinator request 
(HeartbeatManager skips sending requests if it does not know the coordinator 
[here|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L189]).
 Once the consumer gets a response for the FindCoordinator, a HeartbeatRequest 
request will be sent containing the new subscription. The consumer will then 
eventually receive the assignment, but we don't know exactly when from the 
consumer point of view. The rebalance callbacks are what signal to the consumer 
that the call to subscribe completed with an assignment received. So it's only 
after the consumer gets the assignment that a call to poll can return the 
records that are available. 

So based on those expectations and back to your example, we don't need to wait 
before calling subscribe (that's handled internally by the 
HeartbeatRequestManager as described above). I wonder if it's the fact that in 
the failed case you're polling 10 times only (vs. 100 times in the successful 
case)?? In order to receive records, we do need to make sure that we are 
calling poll after the assignment has been received (so the consumer issues a 
fetch request for the partitions assigned). Note that even when you poll for 1s 
in your test, a poll that happens before the assignment has been received, will 
block for 1s but it's doomed to return empty, because it is not waiting for 
records from the topics you're interested in (no partitions assigned yet). 
Could you make sure that the test is calling poll after the assignment has been 
received? (I would suggest just polling while true for a certain amount of 
time, no sleeping after the subscribe needed). 

This integration test for the consumer 
[testGroupConsumption|https://github.com/apache/kafka/blob/0b4eaefd863e911c211f3defccd75c0ae78e14d2/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L153]
 has a very similar logic to the one you're trying to achieve (create consumer, 
subscribe right away and consume), and since a new broker and consumer are 
setup for each test, the test will go down the same path of having to find a 
coordinator before sending the HeartbeatRequest with a subscription. The main 
difference from looking at both seems to be the limited number of polls in your 
failed test scenario, so let's try to rule that out to better isolate the 
situation. Hope it helps! Let me know

> KIP-848 : Consumer will not receive assignment forever because of concurrent 
> issue.
> ---
>
> Key: KAFKA-16670
> URL: https://issues.apache.org/jira/browse/KAFKA-16670
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Major
>
> *Related Code*
>  * Consumer get assignment Successfully :
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L35-L57]
>  * Consumer get be stuck Forever because of concurrent issue:
>  ** 
> [https://github.com/chickenchickenlove/new-consumer-error/blob/8c1d74db1ec60350c28f5ed25f595559180dc603/src/test/java/com/example/MyTest.java#L61-L79]
>  
> *Unexpected behaviour*
>  * 
> Broker is sufficiently slow.
>  * When a KafkaConsumer is created and immediately subscribes to a topic
> If both conditions are met, {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and become stuck indefinitely.
> In case of new broker and new consumer, when consumer are created, consumer 
> background thread send a request to broker. (I guess groupCoordinator 
> Heartbeat request). In that time, if broker does not load metadata from 
> {{{}__consumer_offset{}}}, broker will start to schedule load metadata. After 
> broker load metadata completely, consumer background thread think 'this 
> broker is valid group coordinator'.
> However, consumer can send {{subscribe}} request to broker before {{broker}} 
> reply about {{{}groupCoordinator HeartBeat Request{}}}. In that case, 
> consumer seems to be stuck.
> If both conditions are met, the {{Consumer}} can potentially never receive 
> {{TopicPartition}} assignments and may become indefinitely stuck. In the case 
> 

[jira] [Created] (KAFKA-16675) Move rebalance callback test for positions to callbacks test file

2024-05-06 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16675:
--

 Summary: Move rebalance callback test for positions to callbacks 
test file
 Key: KAFKA-16675
 URL: https://issues.apache.org/jira/browse/KAFKA-16675
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


Integration test 
testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback was added 
to the PlaintextConsumerTest.scala in this PR 
https://github.com/apache/kafka/pull/15856, as there was no specific file for 
testing callbacks at the moment. Another PR is in-flight, adding the file for 
callback-related tests, https://github.com/apache/kafka/pull/15408. Once 15408 
gets merged, we should move 
testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback to it.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843324#comment-17843324
 ] 

Lianet Magrans commented on KAFKA-16474:


Yes, makes sense, that PR is addressing only point 1. of what you reported, so 
we still have the point about the poll frequency (maybe to address separately 
if we still believe it's a concern) 

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843318#comment-17843318
 ] 

Lianet Magrans commented on KAFKA-16474:


Hey [~pnee], this is the PR https://github.com/apache/kafka/pull/15723 from 
[~kirktrue] that fixes a bug that for sure was leading to double heartbeats (I 
guess it's what you were seeing here, to double check)

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer

2024-05-03 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16665:
---
Description: 
If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

Note that a partition position can be updated from 2 places: call to 
consumer.position or call to consumer.poll. Both will attempt to 
`updateFetchPositions` if there is no valid position yet, but even after having 
a valid position after those calls, the partition will remain non-fetchable 
until the onPartitionsAssigned callback completes (fetchable considers that the 
partitions has a valid position AND is not awaiting the callback)

  

  was:
If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

  


> Fail to get partition's position from within onPartitionsAssigned callback in 
> new consumer 
> ---
>
> Key: KAFKA-16665
> URL: https://issues.apache.org/jira/browse/KAFKA-16665
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If we attempt to call consumer.position(tp) from within the 
> onPartitionsAssigned callback, the new consumer fails with a 
> TimeoutException. The expectation is that we should be able to retrieve the 
> position of newly assigned partitions, as it happens with the legacy 
> consumer, that allows this call. This is actually used from places within 
> Kafka itself (ex. Connect 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
>  
> The failure in the new consumer is because the partitions that are assigned 
> but awaiting the onPartitionsAssigned callback, are excluded from the list of 
> partitions that should initialize. We should allow the partitions to 
> initialize their positions, without allowing to fetch data from them (which 
> is already achieve based on the isFetchable flag in the subscription state).
> Note that a partition position can be updated from 2 places: call to 
> consumer.position or call to consumer.poll. Both will attempt to 
> `updateFetchPositions` if there is no valid position yet, but even after 
> having a valid position after those calls, the partition will remain 
> non-fetchable until the onPartitionsAssigned callback completes (fetchable 
> considers that the partitions has a valid position AND is not awaiting the 
> callback)
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer

2024-05-03 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16665:
--

 Summary: Fail to get partition's position from within 
onPartitionsAssigned callback in new consumer 
 Key: KAFKA-16665
 URL: https://issues.apache.org/jira/browse/KAFKA-16665
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
Assignee: Lianet Magrans
 Fix For: 3.8.0


If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-05-01 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842765#comment-17842765
 ] 

Lianet Magrans commented on KAFKA-16514:


Just to answer the question above:
{quote}what is the purpose of -2 code? In the end, not sending any request, 
with a large enough session timeout, no rebalance would be triggered anyway? 
What does change is we send -2 instead of just not sending any leaver group 
request on close()?{quote}

The purpose is just to set the intention explicitly at the protocol level (and 
not assume it). This is mainly to allow for richer semantics around the static 
membership leave in the future. It does not make a difference at the moment 
(over not sending the leave group), but it does allow to cleanly extend the 
current logic if we ever want to, and allow static members to leave permanently 
by sending a -1 epoch on the leave group. That would effectively allow to 
remove a static members from a group (which can only be achieved now either 
waiting for the session timeout to expire, or via the admin api)

Anyways, that's just food for thought for now. The KIP extending the consumer 
close with options seems sensible to solve the current situation, and would 
align nicely with any future extension of the static leave semantics if we ever 
go down that path. 

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 8:25 PM:
-

Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your client code again, locally 
starting a broker in kraft mode, with the default config, only adding 
`group.coordinator.rebalance.protocols=consumer,classic`. 1 topic, 1 partition, 
1 instance of your consumer app running with the poll duration of 1s, and was 
able to consume messages as expected. I only changed to StringDeserializers for 
simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest taking a 
look and share the broker logs to understand more about what's going on on your 
setup. If all looks good there maybe provide a ConsumerRebalanceListener to the 
call to subscribe, just to check/print the partitions assigned to your consumer 
on the onPartitionsAssigned callback. Hope it helps!


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps!

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | 

[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM:
-

Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps!


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> 

[jira] [Comment Edited] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM:
-

Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

bq. for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps


was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

{quote}for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
topic1{quote}

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> 

[jira] [Commented] (KAFKA-16637) KIP-848 does not work well

2024-04-30 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842470#comment-17842470
 ] 

Lianet Magrans commented on KAFKA-16637:


Hey [~chickenchickenlove], just to rule out the basics, did you make sure to 
produce messages to the same test-topic1? No other consumers subscribed to it 
that could be owning the partition? I tried your code again, 1 topic, 1 
partition, 1 instance of your consumer app running with the poll duration of 
1s, and was able to consume messages as expected. I only changed to 
StringDeserializers for simplicity:

{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());{quote}

and produced a bunch of messages with:

{quote}for x in {1..10}; do echo "Test message  $x"; done | 
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
topic1{quote}

Then the consumer app with your code printed the 10 messages as expected. If 
after double checking that you're still facing issues, I would suggest to 
provide a ConsumerRebalanceListener to the call to subscribe, just to 
check/print the partitions assigned to your consumer on the 
onPartitionsAssigned callback, and also taking a look and share the broker logs 
to understand more about what's going on on your setup. Hope it helps

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16637) KIP-848 does not work well

2024-04-29 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842160#comment-17842160
 ] 

Lianet Magrans commented on KAFKA-16637:


Hey [~chickenchickenlove], thanks for reporting this. I wonder if you're 
hitting a known issue with the consumer api and timeous (being fixed with 
https://issues.apache.org/jira/browse/KAFKA-16200 and 
https://issues.apache.org/jira/browse/KAFKA-15974). I tried your code, changing 
only the call poll from what you had `kafkaConsumer.poll(Duration.ZERO)` , to 
provide a non-zero duration, and it all worked as expected. So I guess it could 
be related to the timeout enforcement issues being fixed on the consumer side. 

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Minor
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16465) New consumer does not invoke rebalance callbacks as expected in consumer_test.py system test

2024-04-29 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16465.
--

> New consumer does not invoke rebalance callbacks as expected in 
> consumer_test.py system test
> 
>
> Key: KAFKA-16465
> URL: https://issues.apache.org/jira/browse/KAFKA-16465
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with the following error:
> {noformat}
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_static_consumer_bounce.clean_shutdown=True.static_membership=False.bounce_mode=all.num_bounces=5.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 29.511 seconds
> AssertionError('Total revoked count 0 does not match the expectation of 
> having 0 revokes as 0')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 254, in test_static_consumer_bounce
> (num_revokes_after_bounce, check_condition)
> AssertionError: Total revoked count 0 does not match the expectation of 
> having 0 revokes as 0
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16459) New consumer times out joining group in consumer_test.py system test

2024-04-29 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16459.
--

> New consumer times out joining group in consumer_test.py system test
> 
>
> Key: KAFKA-16459
> URL: https://issues.apache.org/jira/browse/KAFKA-16459
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{consumer_test.py}} system test fails with two different errors related 
> to consumers joining the consumer group in a timely fashion.
> {quote}
> * Consumers failed to join in a reasonable amount of time
> * Timed out waiting for consumers to join, expected total X joined, but only 
> see Y joined fromnormal consumer group and Z from conflict consumer 
> group{quote}
> Affected tests:
>  * {{test_fencing_static_consumer}}
>  * {{test_static_consumer_bounce}}
>  * {{test_static_consumer_persisted_after_rejoin}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841986#comment-17841986
 ] 

Lianet Magrans commented on KAFKA-16514:


If my understanding here is right, this seems like a fair requirement to have 
the ability to dynamically decide if a member should leave the group, 
regardless of its dynamic/static membership. We've actually had conversations 
about this while working on the new consumer group protocol, where static 
members do send a leave group request when leaving, and they send it with an 
epoch that explicitly indicates it's a temporary leaving (-2). For now this is 
the only way static members leave (temporarily), but the ground is set if ever 
in the future we decide that we want to allow static members to send a 
definitive leave group (ex. -1 epoch, like dynamic members do). So with this 
context, and back to the streams situation here, I wonder if we would be better 
overall if we bring a less hacky solution and enable a close with richer 
semantics at the consumer level, that would allow to have a close where 
options/params could be passed to dynamically indicate how to close 
(temporarily, definitely, ...), and then align this nicely with the new 
protocol (and make it work with the legacy one). Thoughts?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager

2024-04-26 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16628:
--

Assignee: Lianet Magrans

> Add system test for validating static consumer bounce and assignment when not 
> eager
> ---
>
> Key: KAFKA-16628
> URL: https://issues.apache.org/jira/browse/KAFKA-16628
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, system tests
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> Existing system 
> [test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209]
>  include a test for validating that partitions are not re-assigned when a 
> static member is bounced, but the test design and setup is intended for 
> testing this for the Eager assignment strategy only (based on the eager 
> protocol where all dynamic members revoke their partitions when a rebalance 
> happens). 
> We should considering adding a test that would ensure that partitions are not 
> re-assigned when using the cooperative sticky assignor or the new consumer 
> group protocol assignments. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager

2024-04-26 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16628:
--

 Summary: Add system test for validating static consumer bounce and 
assignment when not eager
 Key: KAFKA-16628
 URL: https://issues.apache.org/jira/browse/KAFKA-16628
 Project: Kafka
  Issue Type: Task
  Components: consumer, system tests
Reporter: Lianet Magrans


Existing system 
[test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209]
 include a test for validating that partitions are not re-assigned when a 
static member is bounced, but the test design and setup is intended for testing 
this for the Eager assignment strategy only (based on the eager protocol where 
all dynamic members revoke their partitions when a rebalance happens). 
We should considering adding a test that would ensure that partitions are not 
re-assigned when using the cooperative sticky assignor or the new consumer 
group protocol assignments. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16528) Reset member heartbeat interval when request sent

2024-04-26 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16528.
--

> Reset member heartbeat interval when request sent
> -
>
> Key: KAFKA-16528
> URL: https://issues.apache.org/jira/browse/KAFKA-16528
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Member should reset the heartbeat timer when the request is sent, rather than 
> when a response is received. This aims to ensure that we don't add-up to 
> interval any delay there might be in a response. With this, we better respect 
> the contract of members sending HB on the interval to remain in the group, 
> and avoid potential unwanted rebalances.   
> Note that there is already a logic in place to avoid sending a request if a 
> response hasn't been received. So that will ensure that, even with the reset 
> of the interval on the send,  the next HB will only be sent as when the 
> response is received. (Will be sent out on the next poll of the HB manager, 
> and respecting the minimal backoff for sending consecutive requests). This 
> will btw be consistent with how the interval timing & in-flights is handled 
> for auto-commits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged

2024-04-19 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839096#comment-17839096
 ] 

Lianet Magrans commented on KAFKA-16493:


Hey [~phuctran], any progress on this one? Even though it's a performance 
improvement I'm afraid it's a very sensitive one given that it would affect the 
poll loop, so we need to make sure it makes it into 3.8 (it had the wrong fix 
version before, I just updated it). Let me know if you have any questions. 
Thanks!

> Avoid unneeded subscription regex check if metadata version unchanged
> -
>
> Key: KAFKA-16493
> URL: https://issues.apache.org/jira/browse/KAFKA-16493
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> When using pattern subscription (java pattern), the new consumer regularly 
> checks if the list of topics that match the regex has changed. This is done 
> as part of the consumer poll loop, and it evaluates the regex using the 
> latest cluster metadata. As an improvement, we should avoid evaluating the 
> regex if the metadata version hasn't changed (similar to what the legacy 
> coordinator does 
> [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged

2024-04-19 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16493:
---
Fix Version/s: 3.8.0
   (was: 4.0.0)

> Avoid unneeded subscription regex check if metadata version unchanged
> -
>
> Key: KAFKA-16493
> URL: https://issues.apache.org/jira/browse/KAFKA-16493
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When using pattern subscription (java pattern), the new consumer regularly 
> checks if the list of topics that match the regex has changed. This is done 
> as part of the consumer poll loop, and it evaluates the regex using the 
> latest cluster metadata. As an improvement, we should avoid evaluating the 
> regex if the metadata version hasn't changed (similar to what the legacy 
> coordinator does 
> [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-18 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16566:
---
Labels: kip-848-client-support system-tests  (was: kip-848-client-support)

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-18 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16566:
---
Component/s: clients
 consumer
 system tests

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:59 PM:
-

Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new protocol/consumer. You're on the right page regarding the rest: Streams 
tests haven't been migrated yet because it's not integrated with the new 
protocol. 

Classic protocol refers to the existing group protocol, and Consumer protocol 
refers to the new one introduced with KIP-848 (just using the names proposed to 
be used in the configs to switch between both)


was (Author: JIRAUSER300183):
Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

Classic protocol refers to the existing group protocol, and Consumer protocol 
refers to the new one introduced with KIP-848 (just using the names proposed to 
be used in the configs to switch between both)

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16566:
---
Summary: Update consumer static membership fencing system test to support 
new protocol  (was: Update static membership fencing system test to support new 
protocol)

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol

2024-04-16 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826
 ] 

Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:34 PM:
-

Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

Classic protocol refers to the existing group protocol, and Consumer protocol 
refers to the new one introduced with KIP-848 (just using the names proposed to 
be used in the configs to switch between both)


was (Author: JIRAUSER300183):
Hey [~ableegoldman], this is specific to the static membership, and related to 
the consumer system tests only, that were parametrized to run with the legacy 
and new coordinator/consumer. You're on the right page regarding the rest: 
Streams tests haven't been migrated yet because it's not integrated with the 
new protocol. 

> Update static membership fencing system test to support new protocol
> 
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   >