[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841369#comment-17841369
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16514:
------------------------------------------------

The immutable internal config thing is definitely a bummer.

To recap: if we want to solve this so that the current Streams API – ie the 
#close(closeOptions) API – works as intended, ie for non-static members as 
well, we'd need to change the way the consumer works. Or wait for mutable 
configs, which would be nice, but realistically that's not happening soon 
enough.

To do this "right" we'd probably need to introduce a new public consumer API of 
some sort which would mean going through a KIP which could be a bit messy.

But as a slightly-hacky alternative, would it be possible to just introduce an 
internal API that works similar to the effect of the existing internal config, 
and just have Kafka Streams use that internal API without making it a "real" 
API and having to do a KIP? I mean that's basically what the internal config is 
anyways – an internal config not exposed to/intended for use by consumer 
applications and only introduced for Kafka Streams to use. Doesn't seem that 
big a deal to just switch from this immutable config to a new internal overload 
of #close (or even an internal #leaveGroupOnClose API that can be toggled 
on/off). Thoughts?

[~mjsax] [~cadonna] maybe you can raise this with someone who works on the 
clients to see if there are any concerns/make sure no one would object to this 
approach?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-16514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16514
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.7.0
>            Reporter: Sal Sorrentino
>            Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
>         StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>         "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to