lucasbru commented on code in PR #18652:
URL: https://github.com/apache/kafka/pull/18652#discussion_r1934189843


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    // helper data structures:
+    private TaskPairs taskPairs;

Review Comment:
   Even if we had one assignor for each group, we'd have to be careful, because 
our `assign` function should not rely on the local state of the previous 
`assign` call. In particular, the local state of the assignor is not persisted 
anywhere, so we cannot recover it from the `consumer_offset` topic after a 
fail-over. We generally try to avoid patterns where the assignment works 
different after fail-over - so all the inputs to the assignor must be persisted 
in the `consumer_offset` topic. 
   
   The next question, why not just create an assignor every time we call 
`assign`? I think the short answer is, we are just doing the same as consumer 
groups, which don't do this, and we didn't have a reason to change this.
   
   The long answer (I'm somewhat speculating, here) is probably that it's the 
slightly more generic pattern to have an assignor persist for longer than just 
a single heartbeat.  The assignor may, in the future, be plugged with custom, 
potentially proprietary code (this exists already for consumer groups - we 
don't have the interface for streams yet in KIP-1071, but may add it in the 
future). That code may do all kinds of things - for example, set up a 
connection to an external service to perform the assignment, which may require 
one-time initialization and local state (caches and such). So, while our 
default assignors like sticky assignor are essentially stateless (and should be 
stateless), we'd make stateful implementations hard by making the assignor 
interface short-lived, and I believe this is not something we want to do. In 
this, long-lived interface, we can just make sure that any local state within 
the `assign` function is short-lived.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to