[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842185#comment-17842185
 ] 

Sal Sorrentino commented on KAFKA-16514:


Bug, feature gap; /təˈmeɪ.t̬oʊ/ , /təˈmɑː.təʊ/. I only know I followed the 
programmatic interface provided but could not achieve the advertised results 
without perusing source code to find the undocumented secret sauce ;)


Your point IS taken however. I am happy to create a KIP if that is the best 
path forward, but am unfamiliar with the process and will need to do some 
reading. I don't think there is an urgent need to get this in 3.8, but I think 
documenting the issue and the workaround is appropriate in the short term.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842014#comment-17842014
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/29/24 2:55 PM:
-

Less tacky is better. Looking at other consumer client implementations (kgo - 
Golang driver specifically), the process to leave a group is:
 # The member leaving the group to send a rebalance request indicating its 
partitions should be reassigned.
 # The incremental rebalance process takes place, removing the partitions from 
the appropriate member and assigning them to another.
 # The member leaves the group via LeaveGroupRequest with it's member id as 
it's payload (
LeaveGroupRequestMember)
 

If the prior suggestion of using an unsubscribe accomplishes this, that would 
be the way to go, but I'm not sure that it does. Also, I don't believe the 
current Close with leave=true does either. My hope is the abandoned partitions 
would be assigned to the member with the most up to date replica of each 
partition, but I can neither confirm not deny this ;)  My assumption at the 
moment is that the member leaves immediately regardless of the entire group 
state...it does not wait for another member to build the state in the 
background before leaving (my assumption could be wrong).

The problem I'm having is the scope-splosion of this bug.

Is there a reason we can't pass the member id to the leave group request in the 
absence of a static member id?
{code:java}
String memberId = clientSupplier.consumer().groupMetadata().memberId(); {code}


was (Author: JIRAUSER305028):
Less tacky is better. Looking at other consumer client implementations (kgo - 
Golang driver specifically), the process to leave a group is:
 # The member leaving the group to send a rebalance request indicating its 
partitions should be reassigned.
 # The incremental rebalance process takes place, removing the partitions from 
the appropriate member and assigning them to another.
 # The member leaves the group via LeaveGroupRequest with it's member id as 
it's payload (
LeaveGroupRequestMember)
 

If the prior suggestion of using an unsubscribe accomplishes this, that would 
be the way to go, but I'm not sure that it does. Also, I don't believe the 
current Close with leave=true does either. My hope is the abandoned partitions 
would be assigned to the member with the most up to date replica of each 
partition, but I can neither confirm not deny this ;)

The problem I'm having is the scope-splosion of this bug.

Is there a reason we can't pass the member id to the leave group request in the 
absence of a static member id?
{code:java}
String memberId = clientSupplier.consumer().groupMetadata().memberId(); {code}

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> 

[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842014#comment-17842014
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/29/24 2:49 PM:
-

Less tacky is better. Looking at other consumer client implementations (kgo - 
Golang driver specifically), the process to leave a group is:
 # The member leaving the group to send a rebalance request indicating its 
partitions should be reassigned.
 # The incremental rebalance process takes place, removing the partitions from 
the appropriate member and assigning them to another.
 # The member leaves the group via LeaveGroupRequest with it's member id as 
it's payload (
LeaveGroupRequestMember)
 

If the prior suggestion of using an unsubscribe accomplishes this, that would 
be the way to go, but I'm not sure that it does. Also, I don't believe the 
current Close with leave=true does either. My hope is the abandoned partitions 
would be assigned to the member with the most up to date replica of each 
partition, but I can neither confirm not deny this ;)

The problem I'm having is the scope-splosion of this bug.

Is there a reason we can't pass the member id to the leave group request in the 
absence of a static member id?
{code:java}
String memberId = clientSupplier.consumer().groupMetadata().memberId(); {code}


was (Author: JIRAUSER305028):
Less tacky is better. Looking at other consumer client implementations (kgo - 
Golang driver specifically), the process to leave a group is:
 # The member leaving the group to send a rebalance request indicating its 
partitions should be reassigned.
 # The incremental rebalance process takes place, removing the partitions from 
the appropriate member and assigning them to another.
 # The member leaves the group via LeaveGroupRequest with it's member id as 
it's payload (
LeaveGroupRequestMember)
 

If the prior suggestion of using an unsubscribe accomplishes this, that would 
be the way to go, but I'm not sure that it does. Also, I don't believe the 
current Close with leave=true does either. My hope is the abandoned partitions 
would be assigned to the member with the most up to date replica of each 
partition, but I can neither confirm not deny this ;)

The problem I'm having is the scope-splosion of this bug.

Is there a reason we can't pass the member id to the leave group request in the 
absence of a static member id?

```
String memberId = clientSupplier.consumer().groupMetadata().memberId();
```

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842014#comment-17842014
 ] 

Sal Sorrentino commented on KAFKA-16514:


Less tacky is better. Looking at other consumer client implementations (kgo - 
Golang driver specifically), the process to leave a group is:
 # The member leaving the group to send a rebalance request indicating its 
partitions should be reassigned.
 # The incremental rebalance process takes place, removing the partitions from 
the appropriate member and assigning them to another.
 # The member leaves the group via LeaveGroupRequest with it's member id as 
it's payload (
LeaveGroupRequestMember)
 

If the prior suggestion of using an unsubscribe accomplishes this, that would 
be the way to go, but I'm not sure that it does. Also, I don't believe the 
current Close with leave=true does either. My hope is the abandoned partitions 
would be assigned to the member with the most up to date replica of each 
partition, but I can neither confirm not deny this ;)

The problem I'm having is the scope-splosion of this bug.

Is there a reason we can't pass the member id to the leave group request in the 
absence of a static member id?

```
String memberId = clientSupplier.consumer().groupMetadata().memberId();
```

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-28 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841670#comment-17841670
 ] 

Sal Sorrentino commented on KAFKA-16514:


Well, this is not entirely accurate. As mentioned in one of my first comments, 
by providing a `group.instance.id`, you can achieve the desired behavior 
without using any "internal" configs. However, I agree that it does seem broken 
that the only way to voluntarily leave a group is to be static member.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-21 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839452#comment-17839452
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/22/24 2:13 AM:
-

IMHO: I think if you have a replication factor of 0 and your application uses 
persistent container (such as a k8s stateful pod) that it "could" be 
relevant...but I think the recommended replication factor is 1. In which case 
an incremental rebalance is probably preferable to a partition blackout during 
an application bounce, especially if you are using something like spring for 
dependency injection as application boot times are not exactly speedy. If you 
are using in memory state stores at all, you would want to leave the group in 
every scenario I would think.

Long story short, I think the "hack" is relevant if you have a replication 
factor of 0.


was (Author: JIRAUSER305028):
IMHO: I think if you have a replication factor of 0 and your application uses 
persistent container (such as a k8s stateful pod) that it "could" be 
relevant...but I think the recommended replication factor is 1, in which case 
an incremental rebalance is probably preferable to a partition blackout during 
an application bounce, especially if you are using spring for dependency 
injection as application boot times are not exactly speedy. 

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-21 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839452#comment-17839452
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/22/24 2:10 AM:
-

IMHO: I think if you have a replication factor of 0 and your application uses 
persistent container (such as a k8s stateful pod) that it "could" be 
relevant...but I think the recommended replication factor is 1, in which case 
an incremental rebalance is probably preferable to a partition blackout during 
an application bounce, especially if you are using spring for dependency 
injection as application boot times are not exactly speedy. 


was (Author: JIRAUSER305028):
IMHO: I think if you have a replication factor of 0 and your application uses 
persistent container (such as a k8s stateful pod) that it "could" be 
relevant...but I think the recommended replication factor would is 1, in which 
case an incremental rebalance is probably preferable to a partition blackout 
during an application bounce, especially if you are using spring for dependency 
injection as application boot times are not exactly speedy. 

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-21 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839452#comment-17839452
 ] 

Sal Sorrentino commented on KAFKA-16514:


IMHO: I think if you have a replication factor of 0 and your application uses 
persistent container (such as a k8s stateful pod) that it "could" be 
relevant...but I think the recommended replication factor would is 1, in which 
case an incremental rebalance is probably preferable to a partition blackout 
during an application bounce, especially if you are using spring for dependency 
injection as application boot times are not exactly speedy. 

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-20 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839240#comment-17839240
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/20/24 1:32 PM:
-

I spent a little time looking at the code, and I really believe this is a 
Streams implementation issue. Every member of a consumer group has some sort of 
"MemberId" associated with it, regardless of whether or not it is considered 
static. It must, otherwise basic functions such as rebalancing could not work. 
If the static group id is not used, some randomly generated member id must be, 
and the real issue is that this identifier is not be tracked in the stream task.

That being said, the current implementation uses Thread scoped variables to 
store the "group.instance.id" and to really fix this, the consumer group member 
id would need to be stored there as well. One quick fix would be to 
auto-generate this id. It seems to me that if there the existence of a group 
instance id is necessary to drive fundamental consumer group logic, that one 
should be generated if not supplied by the user. The scope of that sort of 
change is a bit large and not to be taken lightly. 

I would recommend, that in the short term, we add a `GROUP_INSTANCE_ID_CONFIG` 
public static variable to the StreamsConfig class and document thoroughly the 
logic that this config drives.

Does that make sense?


was (Author: JIRAUSER305028):
I spent a little time looking at the code, and I really believe this is a 
Streams implementation issue. Every member of a consumer group has some sort of 
"MemberId" associated with it, regardless of whether or not it is considered 
static. It must, otherwise basic functions such as rebalancing could not work. 
If the static group id is not used, some randomly generated member id must be, 
and the real issue is that this identifier is not be tracked in the stream task.

That being said, the current implementation uses Thread scoped variables to 
store the "group.instance.id" and to really fix this, the consumer group member 
id would need to be stored there as well. One quick fix would be to 
auto-generate this id. It seems to me that if there the existence of a group 
instance id is necessary to drive fundamental consumer group logic, that one 
should be generated if not supplied by the user. The scope of that sort of 
change is a bit large and not to be taken lightly. 

I would recommend, that in the short term, we add a `GROUP_INSTANCE_ID` public 
static variable to the StreamsConfig class and document thoroughly the logic 
that this config drives.

Does that make sense?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 

[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-20 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839240#comment-17839240
 ] 

Sal Sorrentino commented on KAFKA-16514:


I spent a little time looking at the code, and I really believe this is a 
Streams implementation issue. Every member of a consumer group has some sort of 
"MemberId" associated with it, regardless of whether or not it is considered 
static. It must, otherwise basic functions such as rebalancing could not work. 
If the static group id is not used, some randomly generated member id must be, 
and the real issue is that this identifier is not be tracked in the stream task.

That being said, the current implementation uses Thread scoped variables to 
store the "group.instance.id" and to really fix this, the consumer group member 
id would need to be stored there as well. One quick fix would be to 
auto-generate this id. It seems to me that if there the existence of a group 
instance id is necessary to drive fundamental consumer group logic, that one 
should be generated if not supplied by the user. The scope of that sort of 
change is a bit large and not to be taken lightly. 

I would recommend, that in the short term, we add a `GROUP_INSTANCE_ID` public 
static variable to the StreamsConfig class and document thoroughly the logic 
that this config drives.

Does that make sense?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836750#comment-17836750
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/12/24 10:06 PM:
--

The KIP mentions nothing about static membership. I would also add that the 
current behavior seems to solve the wrong use case. A static member with 
persistent state is more likely to want to keep membership alive, while a 
member with transient/non-persistent state would want to relinquish membership 
on shutdown.

I would consider this a bug as well. As evidence, my latest workaround involves 
using a random UUID as the "group.instance.id", hardly seems like static 
membership.

I can certainly pick this up, but no promises on the expediency ;)


was (Author: JIRAUSER305028):
The KIP mentions nothing about static membership. I would also add that the 
current behavior seems to solve the wrong use case. A static member with 
persistent state is more likely to want to keep membership alive, while a 
member with transient/non-persistent state would want to relinquish membership 
on shutdown.

I would consider this a bug as well. As evidence, my latest workaround involves 
using a random UUID as the "group.instance.id", hardly seems static.

I can certainly pick this up, but no promises on the expediency ;)

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836750#comment-17836750
 ] 

Sal Sorrentino edited comment on KAFKA-16514 at 4/12/24 10:03 PM:
--

The KIP mentions nothing about static membership. I would also add that the 
current behavior seems to solve the wrong use case. A static member with 
persistent state is more likely to want to keep membership alive, while a 
member with transient/non-persistent state would want to relinquish membership 
on shutdown.

I would consider this a bug as well. As evidence, my latest workaround involves 
using a random UUID as the "group.instance.id", hardly seems static.

I can certainly pick this up, but no promises on the expediency ;)


was (Author: JIRAUSER305028):
The KIP mentions nothing about static membership. I would also add that the 
current behavior seems to solve the wrong use case. A static member with 
persistent state is more likely to want to keep membership alive, while a 
member with transient/non-persistent state would want to relinquish membership 
on shutdown. I would consider this a bug as well.

I can certainly pick this up, but no promises on the expediency ;)

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836750#comment-17836750
 ] 

Sal Sorrentino commented on KAFKA-16514:


The KIP mentions nothing about static membership. I would also add that the 
current behavior seems to solve the wrong use case. A static member with 
persistent state is more likely to want to keep membership alive, while a 
member with transient/non-persistent state would want to relinquish membership 
on shutdown. I would consider this a bug as well.

I can certainly pick this up, but no promises on the expediency ;)

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836644#comment-17836644
 ] 

Sal Sorrentino commented on KAFKA-16514:


Digging further into this, it seems the leaveGroup option is only supported if 
the "group.instance.id" is supplied via the StreamsConfig, however there is not 
documentation around this. The "group.instance.id" has no representation in the 
StreamsConfig class, event though it is intended to be a user supplied 
configuration providing static membership.

It is unclear to me why this flag is only supported for static membership since 
a consumer group member can "leave" a group at any point in time without static 
membership.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-11 Thread Sal Sorrentino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sal Sorrentino updated KAFKA-16514:
---
Affects Version/s: 3.7.0

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
>  
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-11 Thread Sal Sorrentino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sal Sorrentino updated KAFKA-16514:
---
Description: 
Working with Kafka Streams 3.7.0, but may affect earlier versions as well.

When attempting to shutdown a streams application and leave the associated 
consumer group, the supplied `leaveGroup` option seems to have no effect. 
Sample code:
{code:java}
CloseOptions options = new CloseOptions().leaveGroup(true);
stream.close(options);{code}
The expected behavior here is that the group member would shutdown and leave 
the group, immediately triggering a consumer group rebalance. In practice, the 
rebalance happens after the appropriate timeout configuration has expired.

I understand the default behavior in that there is an assumption that any 
associated StateStores would be persisted to disk and that in the case of a 
rolling restart/deployment, the rebalance delay may be preferable. However, in 
our application we are using in-memory state stores and standby replicas. There 
is no benefit in delaying the rebalance in this setup and we are in need of a 
way to force a member to leave the group when shutting down.

The workaround we found is to set an undocumented internal StreamConfig to 
enforce this behavior:
{code:java}
props.put("internal.leave.group.on.close", true);
{code}
To state the obvious, this is less than ideal.

Additional configuration details:
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
props.put(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);{code}
 

  was:
Working with Kafka Streams 3.7.0, but may affect earlier versions as well.

 

When attempting to shutdown a streams application and leave the associated 
consumer group, the supplied `leaveGroup` option seems to have no effect. 
Sample code:
{code:java}
CloseOptions options = new CloseOptions().leaveGroup(true);
stream.close(options);{code}
The expected behavior here is that the group member would shutdown and leave 
the group, immediately triggering a consumer group rebalance. In practice, the 
rebalance happens after the appropriate timeout configuration has expired.

I understand the default behavior in that there is an assumption that any 
associated StateStores would be persisted to disk and that in the case of a 
rolling restart/deployment, the rebalance delay may be preferable. However, in 
our application we are using in-memory state stores and standby replicas. There 
is no benefit in delaying the rebalance in this setup and we are in need of a 
way to force a member to leave the group when shutting down.

The workaround we found is to set an undocumented internal StreamConfig to 
enforce this behavior:
{code:java}
props.put("internal.leave.group.on.close", true);
{code}
To state the obvious, this is less than ideal.

Additional configuration details:
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
props.put(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);{code}
 


> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk 

[jira] [Updated] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-11 Thread Sal Sorrentino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sal Sorrentino updated KAFKA-16514:
---
Description: 
Working with Kafka Streams 3.7.0, but may affect earlier versions as well.

 

When attempting to shutdown a streams application and leave the associated 
consumer group, the supplied `leaveGroup` option seems to have no effect. 
Sample code:
{code:java}
CloseOptions options = new CloseOptions().leaveGroup(true);
stream.close(options);{code}
The expected behavior here is that the group member would shutdown and leave 
the group, immediately triggering a consumer group rebalance. In practice, the 
rebalance happens after the appropriate timeout configuration has expired.

I understand the default behavior in that there is an assumption that any 
associated StateStores would be persisted to disk and that in the case of a 
rolling restart/deployment, the rebalance delay may be preferable. However, in 
our application we are using in-memory state stores and standby replicas. There 
is no benefit in delaying the rebalance in this setup and we are in need of a 
way to force a member to leave the group when shutting down.

The workaround we found is to set an undocumented internal StreamConfig to 
enforce this behavior:
{code:java}
props.put("internal.leave.group.on.close", true);
{code}
To state the obvious, this is less than ideal.

Additional configuration details:
{code:java}
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
props.put(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);{code}
 

  was:
Working with Kafka Streams 3.7.0, but may affect earlier versions as well.

 

When attempting to shutdown a streams application and leave the associated 
consumer group, the supplied `leaveGroup` option seems to have no effect. 
Sample code:
{code:java}
CloseOptions options = new CloseOptions().leaveGroup(true);
stream.close(options);{code}
The expected behavior here is that the group member would shutdown and leave 
the group, immediately triggering a consumer group rebalance. In practice, the 
rebalance happens after the appropriate timeout configuration has expired.

I understand the default behavior in that there is an assumption that any 
associated StateStores would be persisted to disk and that in the case of a 
rolling restart/deployment, the rebalance delay may be preferable. However, in 
our application we are using in-memory state stores and standby replicas. There 
is no benefit in delaying the rebalance in this setup and we are in need of a 
way to force a member to leave the group when shutting down.

The workaround we found is to set an undocumented internal StreamConfig to 
enforce this behavior:
{code:java}
props.put("internal.leave.group.on.close", true);
{code}
To state the obvious, this is less than ideal.

 

 


> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
>  
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than 

[jira] [Updated] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-11 Thread Sal Sorrentino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sal Sorrentino updated KAFKA-16514:
---
Priority: Minor  (was: Major)

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
>  
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-11 Thread Sal Sorrentino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sal Sorrentino updated KAFKA-16514:
---
Description: 
Working with Kafka Streams 3.7.0, but may affect earlier versions as well.

 

When attempting to shutdown a streams application and leave the associated 
consumer group, the supplied `leaveGroup` option seems to have no effect. 
Sample code:
{code:java}
CloseOptions options = new CloseOptions().leaveGroup(true);
stream.close(options);{code}
The expected behavior here is that the group member would shutdown and leave 
the group, immediately triggering a consumer group rebalance. In practice, the 
rebalance happens after the appropriate timeout configuration has expired.

I understand the default behavior in that there is an assumption that any 
associated StateStores would be persisted to disk and that in the case of a 
rolling restart/deployment, the rebalance delay may be preferable. However, in 
our application we are using in-memory state stores and standby replicas. There 
is no benefit in delaying the rebalance in this setup and we are in need of a 
way to force a member to leave the group when shutting down.

The workaround we found is to set an undocumented internal StreamConfig to 
enforce this behavior:
{code:java}
props.put("internal.leave.group.on.close", true);
{code}
To state the obvious, this is less than ideal.

 

 

  was:
Working with Kafka Streams 3.7.0, but may affect earlier versions as well.

 

When attempting to shutdown a streams application and leave the associated 
consumer group, the supplied `leaveGroup` option seems to have no effect. 
Sample code:


{code:java}
CloseOptions options = new CloseOptions().leaveGroup(true);
stream.close(options);{code}

The expected behavior here is that the group member would shutdown and leave 
the group, immediately triggering a consumer group rebalance. In practice, the 
rebalance happens after the appropriate timeout configuration has expired.

I understand the default behavior in that there is an assumption that ant 
associated StateStores would be persisted to disk and that in the case of a 
rolling restart/deployment, the rebalance delay may be preferable. However, in 
our application we are using in-memory state stores and standby replicas. There 
is no benefit in delaying the rebalance in this setup and we are in need of a 
way to force a member to leave the group when shutting down.



The workaround we found is to set an undocumented internal StreamConfig to 
enforce this behavior:


{code:java}
props.put("internal.leave.group.on.close", true);
{code}
To state the obvious, this is less than ideal.

 

 


> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sal Sorrentino
>Priority: Major
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
>  
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-11 Thread Sal Sorrentino (Jira)
Sal Sorrentino created KAFKA-16514:
--

 Summary: Kafka Streams: stream.close(CloseOptions) does not 
respect options.leaveGroup flag.
 Key: KAFKA-16514
 URL: https://issues.apache.org/jira/browse/KAFKA-16514
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sal Sorrentino


Working with Kafka Streams 3.7.0, but may affect earlier versions as well.

 

When attempting to shutdown a streams application and leave the associated 
consumer group, the supplied `leaveGroup` option seems to have no effect. 
Sample code:


{code:java}
CloseOptions options = new CloseOptions().leaveGroup(true);
stream.close(options);{code}

The expected behavior here is that the group member would shutdown and leave 
the group, immediately triggering a consumer group rebalance. In practice, the 
rebalance happens after the appropriate timeout configuration has expired.

I understand the default behavior in that there is an assumption that ant 
associated StateStores would be persisted to disk and that in the case of a 
rolling restart/deployment, the rebalance delay may be preferable. However, in 
our application we are using in-memory state stores and standby replicas. There 
is no benefit in delaying the rebalance in this setup and we are in need of a 
way to force a member to leave the group when shutting down.



The workaround we found is to set an undocumented internal StreamConfig to 
enforce this behavior:


{code:java}
props.put("internal.leave.group.on.close", true);
{code}
To state the obvious, this is less than ideal.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)