[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744472#comment-17744472 ]
Joe Wreschnig commented on KAFKA-15190: --------------------------------------- [~mjsax], setting {{client.id}} doesn't seem to be sufficient, I still observe lots of task shuffling when using deterministic client IDs; when I inject a fake process ID via the file. (In both I've also set acceptable.recovery.lag to MAX_VALUE so the assignments should be as otherwise "stateless" as possible, as far as I understand the algorithm.) I'm not too familiar with the code but although {{StreamsPartitionAssignor}} sometimes calls it a client ID and sometimes a process ID it's a {{UUID}} so I assume it really is the process ID. I'm also not sure the {{client.id}} *must* be unique so trying to reuse it as-is for this may not work. [~ableegoldman] I'm reluctant to put together a KIP as I'm not sure how much time I will have for "getting the work done" reasonably soon. The meat of the proposal would be: {quote}One new configuration option is added and another's behavior is modified: - A new streams configuration option {{process.id}} is added which may be set to a UUID; if provided it overrides the use of {{group.instance.id}} and the generation of a random process ID. - If {{process.id}} is not set and a static membership ID has been set (i.e. using {{StreamsConfig.consumerPrefix}} with {{group.instance.id}), the process ID is generated deterministically using the application ID and that {{{}group.instance.id{}}}. The presence of a persisted {{kafka-streams-process-metadata}} file overrides these options; and if none of these options are provided and no file is found a new random process ID is generated. {quote} If only introducing {{process.id}} requires a KIP, reusing {{group.instance.id}} is really all we need to solve our specific issue. The workaround I have at the moment is: {code:java} public static void injectProcessID(String stateDir, Properties config) { String appID = config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); String instanceID = config.getProperty(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG)); if (appID == null || instanceID == null) { logger.warn("application and instance IDs must be set for a stable process ID"); return; } UUID processID = UUID.nameUUIDFromBytes​((appID + "\000" + instanceID).getBytes()); // ... write the file ... {code} > Allow configuring a streams process ID > -------------------------------------- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams > Reporter: Joe Wreschnig > Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)