[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743311#comment-17743311 ] A. Sophie Blee-Goldman commented on KAFKA-15190: I'm all for this, though it will need a KIP. Would you be interested in writing one? Happy to help you with the process if so. As for the meantime, perhaps you guys can get some relief by just writing this processId directly upon setup, before starting the Streams app? I believe it just expects a plain UUID at the moment, so you should be able to write a function that hashes this container id to something of that form and then persist it to disk in exactly the same way as Streams. > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743313#comment-17743313 ] A. Sophie Blee-Goldman commented on KAFKA-15190: I would be a bit hesitant to overly rely on the group.instance.id because not everyone uses or wants static membership, and that config is completely coupled to the feature. Perhaps we can reuse the group.instance.id as the process id only if/when static membership is already being used, which would not necessarily even require a KIP (maybe), but we'd still need to introduce a new config for the general use case. It's a bummer because of course, practically speaking, this new config would have exactly the same meaning as the group.instance.id – a unique, persistent identifier for each client. It would have been the perfect config for this use case if not for Kafka's habit of being overly clever about reusing configs to enable/disable the related feature, in addition to their actual usage. > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744288#comment-17744288 ] Matthias J. Sax commented on KAFKA-15190: - One more thing: the `process.id` is actually only used as part of the `client.id` iff not `client.id` config is set. – Hence, setting the `client.id` should avoid the issue of rebalancing (and task shuffling)? > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744472#comment-17744472 ] Joe Wreschnig commented on KAFKA-15190: --- [~mjsax], setting {{client.id}} doesn't seem to be sufficient, I still observe lots of task shuffling when using deterministic client IDs; when I inject a fake process ID via the file. (In both I've also set acceptable.recovery.lag to MAX_VALUE so the assignments should be as otherwise "stateless" as possible, as far as I understand the algorithm.) I'm not too familiar with the code but although {{StreamsPartitionAssignor}} sometimes calls it a client ID and sometimes a process ID it's a {{UUID}} so I assume it really is the process ID. I'm also not sure the {{client.id}} *must* be unique so trying to reuse it as-is for this may not work. [~ableegoldman] I'm reluctant to put together a KIP as I'm not sure how much time I will have for "getting the work done" reasonably soon. The meat of the proposal would be: {quote}One new configuration option is added and another's behavior is modified: - A new streams configuration option {{process.id}} is added which may be set to a UUID; if provided it overrides the use of {{group.instance.id}} and the generation of a random process ID. - If {{process.id}} is not set and a static membership ID has been set (i.e. using {{StreamsConfig.consumerPrefix}} with {{group.instance.id}), the process ID is generated deterministically using the application ID and that {{{}group.instance.id{}}}. The presence of a persisted {{kafka-streams-process-metadata}} file overrides these options; and if none of these options are provided and no file is found a new random process ID is generated. {quote} If only introducing {{process.id}} requires a KIP, reusing {{group.instance.id}} is really all we need to solve our specific issue. The workaround I have at the moment is: {code:java} public static void injectProcessID(String stateDir, Properties config) { String appID = config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); String instanceID = config.getProperty(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG)); if (appID == null || instanceID == null) { logger.warn("application and instance IDs must be set for a stable process ID"); return; } UUID processID = UUID.nameUUIDFromBytes((appID + "\000" + instanceID).getBytes()); // ... write the file ... {code} > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744726#comment-17744726 ] Matthias J. Sax commented on KAFKA-15190: - {quote}but although {{StreamsPartitionAssignor}} sometimes calls it a client ID and sometimes a process ID it's a {{UUID}} so I assume it really is the process ID. {quote} Thanks for calling this out. You are right; I missed this point. As you did mention "max recovery lag", I assume you have a stateful app that uses in-memory stores only? Another thing coming to my mind: the `client.id` has actually different purpose and should not be unique per `KafkaStreams` instance, but should be the _same_ for all instances (the name is a little bit mis-leading). For example, if you configure quotas, it's based on `client.id` and you usually want quotas to be set per application, not per instance. > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746236#comment-17746236 ] A. Sophie Blee-Goldman commented on KAFKA-15190: [~jwreschnig] I'm sure we can find someone to pick up the KIP for a full-fledged fix like I proposed, so no worries – it seems reasonable to me to for us to just reuse the static membership group.instance.id for the time being, if it's set, and punt on the generalized feature for now. Would you be interested in just doing a small PR for this case instead? Happy to review such a thing if so > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17795780#comment-17795780 ] Joe Wreschnig commented on KAFKA-15190: --- Sorry for the long delay in answering. I am no longer working on the projects which need this change but I will ask the team who still is whether they can take up preparing the PR, as I know they are still using the workaround I gave above. > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)