[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744472#comment-17744472
 ] 

Joe Wreschnig commented on KAFKA-15190:
---------------------------------------

[~mjsax], setting {{client.id}} doesn't seem to be sufficient, I still observe 
lots of task shuffling when using deterministic client IDs; when I inject a 
fake process ID via the file. (In both I've also set acceptable.recovery.lag to 
MAX_VALUE so the assignments should be as otherwise "stateless" as possible, as 
far as I understand the algorithm.) I'm not too familiar with the code but 
although {{StreamsPartitionAssignor}} sometimes calls it a client ID and 
sometimes a process ID it's a {{UUID}} so I assume it really is the process ID. 
I'm also not sure the {{client.id}} *must* be unique so trying to reuse it 
as-is for this may not work.

[~ableegoldman] I'm reluctant to put together a KIP as I'm not sure how much 
time I will have for "getting the work done" reasonably soon. The meat of the 
proposal would be:
{quote}One new configuration option is added and another's behavior is modified:
 - A new streams configuration option {{process.id}} is added which may be set 
to a UUID; if provided it overrides the use of {{group.instance.id}}
and the generation of a random process ID.
 - If {{process.id}} is not set and a static membership ID has been set (i.e. 
using {{StreamsConfig.consumerPrefix}} with {{group.instance.id}), the process 
ID is generated deterministically using the application ID and that 
{{{}group.instance.id{}}}.

The presence of a persisted {{kafka-streams-process-metadata}} file overrides 
these options; and if none of these options are provided and no file is found a 
new random process ID is generated.
{quote}

If only introducing {{process.id}} requires a KIP, reusing 
{{group.instance.id}} is really all we need to solve our specific issue. The 
workaround I have at the moment is:
{code:java}
    public static void injectProcessID(String stateDir, Properties config) {
        String appID = config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
        String instanceID = 
config.getProperty(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));

        if (appID == null || instanceID == null) {
            logger.warn("application and instance IDs must be set for a stable 
process ID");
            return;
        }

        UUID processID = UUID.nameUUIDFromBytes​((appID + "\000" + 
instanceID).getBytes());
        // ... write the file ...
{code}

> Allow configuring a streams process ID
> --------------------------------------
>
>                 Key: KAFKA-15190
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15190
>             Project: Kafka
>          Issue Type: Wish
>          Components: streams
>            Reporter: Joe Wreschnig
>            Priority: Major
>              Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to