lucasbru commented on code in PR #18652: URL: https://github.com/apache/kafka/pull/18652#discussion_r1934189843
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java: ########## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.streams.assignor; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class StickyTaskAssignor implements TaskAssignor { + + public static final String STICKY_ASSIGNOR_NAME = "sticky"; + private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class); + + // helper data structures: + private TaskPairs taskPairs; Review Comment: Even if we had one assignor for each group, we'd have to be careful, because our `assign` function should not rely on the local state of the previous `assign` call. In particular, the local state of the assignor is not persisted anywhere, so we cannot recover it from the `consumer_offset` topic after a fail-over. We generally try to avoid patterns where the assignment works different after fail-over - so all the inputs to the assignor must be persisted in the `consumer_offset` topic. The next question, why not just create an assignor every time we call `assign`? I think the short answer is, we are just doing the same as consumer groups, which don't do this, and we didn't have a reason to change this. The long answer (I'm somewhat speculating, here) is probably that it's the slightly more generic pattern to have an assignor persist for longer than just a single heartbeat. The assignor may, in the future, be plugged with custom, potentially proprietary code (this exists already for consumer groups - we don't have the interface for streams yet in KIP-1071, but may add it in the future). That code may do all kinds of things - for example, set up a connection to an external service to perform the assignment, which may require one-time initialization and local state (caches and such). So, while our default assignors like sticky assignor are essentially stateless (and should be stateless), we'd make stateful implementations hard by making the assignor interface short-lived, and I believe this is not something we want to do. In this, long-lived interface, we can just make sure that any local state within the `assign` function is short-lived. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
