[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sal Sorrentino updated KAFKA-16514: ----------------------------------- Description: Working with Kafka Streams 3.7.0, but may affect earlier versions as well. When attempting to shutdown a streams application and leave the associated consumer group, the supplied `leaveGroup` option seems to have no effect. Sample code: {code:java} CloseOptions options = new CloseOptions().leaveGroup(true); stream.close(options);{code} The expected behavior here is that the group member would shutdown and leave the group, immediately triggering a consumer group rebalance. In practice, the rebalance happens after the appropriate timeout configuration has expired. I understand the default behavior in that there is an assumption that any associated StateStores would be persisted to disk and that in the case of a rolling restart/deployment, the rebalance delay may be preferable. However, in our application we are using in-memory state stores and standby replicas. There is no benefit in delaying the rebalance in this setup and we are in need of a way to force a member to leave the group when shutting down. The workaround we found is to set an undocumented internal StreamConfig to enforce this behavior: {code:java} props.put("internal.leave.group.on.close", true); {code} To state the obvious, this is less than ideal. Additional configuration details: {code:java} Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); props.put( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);{code} was: Working with Kafka Streams 3.7.0, but may affect earlier versions as well. When attempting to shutdown a streams application and leave the associated consumer group, the supplied `leaveGroup` option seems to have no effect. Sample code: {code:java} CloseOptions options = new CloseOptions().leaveGroup(true); stream.close(options);{code} The expected behavior here is that the group member would shutdown and leave the group, immediately triggering a consumer group rebalance. In practice, the rebalance happens after the appropriate timeout configuration has expired. I understand the default behavior in that there is an assumption that any associated StateStores would be persisted to disk and that in the case of a rolling restart/deployment, the rebalance delay may be preferable. However, in our application we are using in-memory state stores and standby replicas. There is no benefit in delaying the rebalance in this setup and we are in need of a way to force a member to leave the group when shutting down. The workaround we found is to set an undocumented internal StreamConfig to enforce this behavior: {code:java} props.put("internal.leave.group.on.close", true); {code} To state the obvious, this is less than ideal. Additional configuration details: {code:java} Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); props.put( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);{code} > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > ----------------------------------------------------------------------------------- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.7.0 > Reporter: Sal Sorrentino > Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)