[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841986#comment-17841986
 ] 

Lianet Magrans commented on KAFKA-16514:
----------------------------------------

If my understanding here is right, this seems like a fair requirement to have 
the ability to dynamically decide if a member should leave the group, 
regardless of its dynamic/static membership. We've actually had conversations 
about this while working on the new consumer group protocol, where static 
members do send a leave group request when leaving, and they send it with an 
epoch that explicitly indicates it's a temporary leaving (-2). For now this is 
the only way static members leave (temporarily), but the ground is set if ever 
in the future we decide that we want to allow static members to send a 
definitive leave group (ex. -1 epoch, like dynamic members do). So with this 
context, and back to the streams situation here, I wonder if we would be better 
overall if we bring a less hacky solution and enable a close with richer 
semantics at the consumer level, that would allow to have a close where 
options/params could be passed to dynamically indicate how to close 
(temporarily, definitely, ...), and then align this nicely with the new 
protocol (and make it work with the legacy one). Thoughts?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-16514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16514
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.7.0
>            Reporter: Sal Sorrentino
>            Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
>         StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>         "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to