[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-12-12 Thread Joe Wreschnig (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795780#comment-17795780
 ] 

Joe Wreschnig commented on KAFKA-15190:
---

Sorry for the long delay in answering. I am no longer working on the projects 
which need this change but I will ask the team who still is whether they can 
take up preparing the PR, as I know they are still using the workaround I gave 
above.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-24 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746236#comment-17746236
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15190:


[~jwreschnig] I'm sure we can find someone to pick up the KIP for a 
full-fledged fix like I proposed, so no worries – it seems reasonable to me to 
for us to just reuse the static membership group.instance.id for the time 
being, if it's set, and punt on the generalized feature for now. Would you be 
interested in just doing a small PR for this case instead? Happy to review such 
a thing if so

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744726#comment-17744726
 ] 

Matthias J. Sax commented on KAFKA-15190:
-

{quote}but although {{StreamsPartitionAssignor}} sometimes calls it a client ID 
and sometimes a process ID it's a {{UUID}} so I assume it really is the process 
ID.
{quote}
Thanks for calling this out. You are right; I missed this point.

As you did mention "max recovery lag", I assume you have a stateful app that 
uses in-memory stores only?

Another thing coming to my mind: the `client.id` has actually different purpose 
and should not be unique per `KafkaStreams` instance, but should be the _same_ 
for all instances (the name is a little bit mis-leading). For example, if you 
configure quotas, it's based on `client.id` and you usually want quotas to be 
set per application, not per instance.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-19 Thread Joe Wreschnig (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744472#comment-17744472
 ] 

Joe Wreschnig commented on KAFKA-15190:
---

[~mjsax], setting {{client.id}} doesn't seem to be sufficient, I still observe 
lots of task shuffling when using deterministic client IDs; when I inject a 
fake process ID via the file. (In both I've also set acceptable.recovery.lag to 
MAX_VALUE so the assignments should be as otherwise "stateless" as possible, as 
far as I understand the algorithm.) I'm not too familiar with the code but 
although {{StreamsPartitionAssignor}} sometimes calls it a client ID and 
sometimes a process ID it's a {{UUID}} so I assume it really is the process ID. 
I'm also not sure the {{client.id}} *must* be unique so trying to reuse it 
as-is for this may not work.

[~ableegoldman] I'm reluctant to put together a KIP as I'm not sure how much 
time I will have for "getting the work done" reasonably soon. The meat of the 
proposal would be:
{quote}One new configuration option is added and another's behavior is modified:
 - A new streams configuration option {{process.id}} is added which may be set 
to a UUID; if provided it overrides the use of {{group.instance.id}}
and the generation of a random process ID.
 - If {{process.id}} is not set and a static membership ID has been set (i.e. 
using {{StreamsConfig.consumerPrefix}} with {{group.instance.id}), the process 
ID is generated deterministically using the application ID and that 
{{{}group.instance.id{}}}.

The presence of a persisted {{kafka-streams-process-metadata}} file overrides 
these options; and if none of these options are provided and no file is found a 
new random process ID is generated.
{quote}

If only introducing {{process.id}} requires a KIP, reusing 
{{group.instance.id}} is really all we need to solve our specific issue. The 
workaround I have at the moment is:
{code:java}
public static void injectProcessID(String stateDir, Properties config) {
String appID = config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
String instanceID = 
config.getProperty(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));

if (appID == null || instanceID == null) {
logger.warn("application and instance IDs must be set for a stable 
process ID");
return;
}

UUID processID = UUID.nameUUIDFromBytes​((appID + "\000" + 
instanceID).getBytes());
// ... write the file ...
{code}

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744288#comment-17744288
 ] 

Matthias J. Sax commented on KAFKA-15190:
-

One more thing: the `process.id` is actually only used as part of the 
`client.id` iff not `client.id` config is set. – Hence, setting the `client.id` 
should avoid the issue of rebalancing (and task shuffling)?

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743313#comment-17743313
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15190:


I would be a bit hesitant to overly rely on the group.instance.id because not 
everyone uses or wants static membership, and that config is completely coupled 
to the feature. Perhaps we can reuse the group.instance.id as the process id 
only if/when static membership is already being used, which would not 
necessarily even require a KIP (maybe), but we'd still need to introduce a new 
config for the general use case.

It's a bummer because of course, practically speaking, this new config would 
have exactly the same meaning as the group.instance.id – a unique, persistent 
identifier for each client. It would have been the perfect config for this use 
case if not for Kafka's habit of being overly clever about reusing configs to 
enable/disable the related feature, in addition to their actual usage.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743311#comment-17743311
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15190:


I'm all for this, though it will need a KIP. Would you be interested in writing 
one? Happy to help you with the process if so.

As for the meantime, perhaps you guys can get some relief by just writing this 
processId directly upon setup, before starting the Streams app? I believe it 
just expects a plain UUID at the moment, so you should be able to write a 
function that hashes this container id to something of that form and then 
persist it to disk in exactly the same way as Streams.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)