[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842183#comment-17842183
 ] 

Matthias J. Sax commented on KAFKA-16514:
-----------------------------------------

I would not call it a bug, but more like a feature gap. Historically, 
consumer.close() sends a leave group request. For KS, we only added an internal 
config to the consumer to disable sending the leave group request.

Independently, static group membership was added, and a new admit API was added 
to remove a static member from a group. The purpose for this admin feature was 
(IIRC), to allow triggering a rebalance for static groups (which usually run 
with high session timeouts) for the case of a crashed member that won't come 
back again.

Thus, it's two independent features that just don't play nice with each other. 
– In additional, we combine both features with KafkaStreams#close(CloseOptions) 
but given how the APIs are build, it only works for static members.

Thus, there is not really a reason, but it just happens that it all was 
implemented this way given historic context etc.

I am in favor of doing a KIP to add something similar like "CloseOption" to 
Consumer#close() (independent of static membership of not). [~sal.sorrentino] 
Would you be interested to pick it up and write a KIP? We might still be able 
to get it into 3.8 release if we hurry up.

[~lianetm] – what is the purpose of -2 code? In the end, not sending any 
request, with a large enough session timeout, no rebalance would be triggered 
anyway? What does change is we send -2 instead of just not sending any leaver 
group request on close()?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-16514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16514
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.7.0
>            Reporter: Sal Sorrentino
>            Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
>         StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>         "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to