[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473240#comment-16473240
 ] 

ASF GitHub Bot commented on KAFKA-4696:
---------------------------------------

ConcurrencyPractitioner closed pull request #4615: [KAFKA-4696] Streams standby 
task assignment should be state-store aware
URL: https://github.com/apache/kafka/pull/4615
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index f4c9ce00814..94fce7f9ccf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -32,10 +32,13 @@
     public final int topicGroupId;
     /** The ID of the partition. */
     public final int partition;
+    /** The number of State Stores in the task. */
+    private int numberOfStateStores;
 
     public TaskId(int topicGroupId, int partition) {
         this.topicGroupId = topicGroupId;
         this.partition = partition;
+        this.setNumberOfStateStores(0);
     }
 
     public String toString() {
@@ -74,13 +77,20 @@ public static TaskId readFrom(DataInputStream in) throws 
IOException {
         return new TaskId(in.readInt(), in.readInt());
     }
 
-    public void writeTo(ByteBuffer buf) {
+    public void writeTo(ByteBuffer buf, final int version) {
         buf.putInt(topicGroupId);
         buf.putInt(partition);
+        if (version == 2) {
+            buf.putInt(numberOfStateStores);
+        }
     }
 
-    public static TaskId readFrom(ByteBuffer buf) {
-        return new TaskId(buf.getInt(), buf.getInt());
+    public static TaskId readFrom(ByteBuffer buf, final int version) {
+        final TaskId result = new TaskId(buf.getInt(), buf.getInt());
+        if (version == 2) {
+            result.setNumberOfStateStores(buf.getInt());
+        }
+        return result;
     }
 
     @Override
@@ -111,4 +121,12 @@ public int compareTo(TaskId other) {
                         (this.partition > other.partition ? 1 :
                             0)));
     }
+
+    public int numberOfStateStores() {
+        return numberOfStateStores;
+    }
+
+    public void setNumberOfStateStores(int numberOfStateStores) {
+        this.numberOfStateStores = numberOfStateStores;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d9c827fff52..2f414ef11f8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -68,6 +68,7 @@
                  final StateDirectory stateDirectory,
                  final StreamsConfig config) {
         this.id = id;
+        this.id.setNumberOfStateStores(topology.stateStores().size());
         this.applicationId = 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         this.partitions = new HashSet<>(partitions);
         this.topology = topology;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 2a08308a2fd..271f24dbaa2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -121,8 +121,8 @@ public int hashCode() {
 
         void addConsumer(final String consumerMemberId, final SubscriptionInfo 
info) {
             consumers.add(consumerMemberId);
-            state.addPreviousActiveTasks(info.prevTasks);
-            state.addPreviousStandbyTasks(info.standbyTasks);
+            state.addPreviousActiveTasks(info.prevTasks());
+            state.addPreviousStandbyTasks(info.standbyTasks());
             state.incrementCapacity();
         }
 
@@ -288,11 +288,11 @@ public Subscription subscription(Set<String> topics) {
             SubscriptionInfo info = 
SubscriptionInfo.decode(subscription.userData());
 
             // create the new client metadata if necessary
-            ClientMetadata clientMetadata = 
clientsMetadata.get(info.processId);
+            ClientMetadata clientMetadata = 
clientsMetadata.get(info.processId());
 
             if (clientMetadata == null) {
-                clientMetadata = new ClientMetadata(info.userEndPoint);
-                clientsMetadata.put(info.processId, clientMetadata);
+                clientMetadata = new ClientMetadata(info.userEndPoint());
+                clientsMetadata.put(info.processId(), clientMetadata);
             }
 
             // add the consumer to the client
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 15ee849bffc..99c8af51b40 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -29,6 +29,7 @@
     private final Set<TaskId> prevAssignedTasks;
 
     private int capacity;
+    private int numberOfStateStores;
 
 
     public ClientState() {
@@ -37,6 +38,7 @@ public ClientState() {
 
     ClientState(final int capacity) {
         this(new HashSet<TaskId>(), new HashSet<TaskId>(), new 
HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), capacity);
+        this.numberOfStateStores = 0;
     }
 
     private ClientState(Set<TaskId> activeTasks, Set<TaskId> standbyTasks, 
Set<TaskId> assignedTasks, Set<TaskId> prevActiveTasks, Set<TaskId> 
prevAssignedTasks, int capacity) {
@@ -60,6 +62,7 @@ public void assign(final TaskId taskId, final boolean active) 
{
             standbyTasks.add(taskId);
         }
 
+        numberOfStateStores += taskId.numberOfStateStores();
         assignedTasks.add(taskId);
     }
 
@@ -116,8 +119,8 @@ boolean hasMoreAvailableCapacityThan(final ClientState 
other) {
             throw new IllegalStateException("Capacity of other ClientState 
must be greater than 0");
         }
 
-        final double otherLoad = (double) other.assignedTaskCount() / 
other.capacity;
-        final double thisLoad = (double) assignedTaskCount() / capacity;
+        final double otherLoad =  ((double) other.assignedTaskCount() + 
(double) 0.5 * numberOfStateStores) / other.capacity;
+        final double thisLoad =  ((double) assignedTaskCount() + (double) 0.5 
* numberOfStateStores) / capacity;
 
         if (thisLoad < otherLoad)
             return true;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index f583dbafc94..b56dd55ece3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -31,109 +31,235 @@
 
     private static final Logger log = 
LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    private static final int CURRENT_VERSION = 2;
+    public static final int LATEST_SUPPORTED_VERSION = 2;
 
-    public final int version;
-    public final UUID processId;
-    public final Set<TaskId> prevTasks;
-    public final Set<TaskId> standbyTasks;
-    public final String userEndPoint;
+    private final int usedVersion;
+    private UUID processId;
+    private Set<TaskId> prevTasks;
+    private Set<TaskId> standbyTasks;
+    private String userEndPoint;
 
-    public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> 
standbyTasks, String userEndPoint) {
-        this(CURRENT_VERSION, processId, prevTasks, standbyTasks, 
userEndPoint);
+    private SubscriptionInfo(final int version) {
+        this.usedVersion = version;
     }
 
-    private SubscriptionInfo(int version, UUID processId, Set<TaskId> 
prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
-        this.version = version;
+    public SubscriptionInfo(final UUID processId,
+                            final Set<TaskId> prevTasks,
+                            final Set<TaskId> standbyTasks,
+                            final String userEndPoint) {
+        this(LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, 
userEndPoint);
+    }
+
+    public SubscriptionInfo(final int version,
+                            final UUID processId,
+                            final Set<TaskId> prevTasks,
+                            final Set<TaskId> standbyTasks,
+                            final String userEndPoint) {
+        this.usedVersion = version;
         this.processId = processId;
         this.prevTasks = prevTasks;
         this.standbyTasks = standbyTasks;
         this.userEndPoint = userEndPoint;
     }
 
+    public int version() {
+        return usedVersion;
+    }
+
+    public UUID processId() {
+        return processId;
+    }
+
+    public Set<TaskId> prevTasks() {
+        return prevTasks;
+    }
+
+    public Set<TaskId> standbyTasks() {
+        return standbyTasks;
+    }
+
+    public String userEndPoint() {
+        return userEndPoint;
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to encode the data
      */
     public ByteBuffer encode() {
-        byte[] endPointBytes;
+        final ByteBuffer buf;
+
+        switch (usedVersion) {
+            case 1:
+                buf = encodeVersionOne();
+                break;
+            case 2:
+                buf = encodeVersionTwo(prepareUserEndPoint());
+                break;
+            default:
+                throw new IllegalStateException("Unknown metadata version: " + 
usedVersion
+                    + "; latest supported version: " + 
LATEST_SUPPORTED_VERSION);
+        }
+
+        buf.rewind();
+        return buf;
+    }
+
+    private ByteBuffer encodeVersionOne() {
+        final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength());
+
+        buf.putInt(1); // version
+        encodeVersionOneData(buf);
+
+        return buf;
+    }
+
+    private int getVersionOneByteLength() {
+        return 4 + // version
+               16 + // client ID
+               4 + prevTasks.size() * 8 + // length + prev tasks
+               4 + standbyTasks.size() * 8; // length + standby tasks
+    }
+
+    private void encodeVersionOneData(final ByteBuffer buf) {
+        // encode client UUID
+        buf.putLong(processId.getMostSignificantBits());
+        buf.putLong(processId.getLeastSignificantBits());
+        // encode ids of previously running tasks
+        buf.putInt(prevTasks.size());
+        for (TaskId id : prevTasks) {
+            id.writeTo(buf, 1);
+        }
+        // encode ids of cached tasks
+        buf.putInt(standbyTasks.size());
+        for (TaskId id : standbyTasks) {
+            id.writeTo(buf, 1);
+        }
+    }
+
+    private byte[] prepareUserEndPoint() {
         if (userEndPoint == null) {
-            endPointBytes = new byte[0];
+            return new byte[0];
         } else {
-            endPointBytes = userEndPoint.getBytes(Charset.forName("UTF-8"));
+            return userEndPoint.getBytes(Charset.forName("UTF-8"));
         }
-        ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process 
id */ + 4 +
-                prevTasks.size() * 8 + 4 + standbyTasks.size() * 8
-                + 4 /* length of bytes */ + endPointBytes.length
-        );
-        // version
-        buf.putInt(version);
+    }
+
+    private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) {
+        final ByteBuffer buf = 
ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes));
+
+        buf.putInt(2); // version
+        encodeVersionTwoData(buf, endPointBytes);
+
+        return buf;
+    }
+
+    private int getVersionTwoByteLength(final byte[] endPointBytes) {
+        return getVersionOneByteLength() + (prevTasks.size() + 
standbyTasks.size()) * 4 +
+               4 + endPointBytes.length; // length + userEndPoint
+    }
+
+    private void encodeVersionTwoData(final ByteBuffer buf,
+                                      final byte[] endPointBytes) {
         // encode client UUID
         buf.putLong(processId.getMostSignificantBits());
         buf.putLong(processId.getLeastSignificantBits());
         // encode ids of previously running tasks
         buf.putInt(prevTasks.size());
         for (TaskId id : prevTasks) {
-            id.writeTo(buf);
+            id.writeTo(buf, 2);
         }
         // encode ids of cached tasks
         buf.putInt(standbyTasks.size());
         for (TaskId id : standbyTasks) {
-            id.writeTo(buf);
+            id.writeTo(buf, 2);
+        }
+        if (endPointBytes != null) {
+            buf.putInt(endPointBytes.length);
+            buf.put(endPointBytes);
         }
-        buf.putInt(endPointBytes.length);
-        buf.put(endPointBytes);
-        buf.rewind();
-        return buf;
     }
 
     /**
      * @throws TaskAssignmentException if method fails to decode the data
      */
-    public static SubscriptionInfo decode(ByteBuffer data) {
+    public static SubscriptionInfo decode(final ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
 
-        // Decode version
-        int version = data.getInt();
-        if (version == CURRENT_VERSION || version == 1) {
-            // Decode client UUID
-            UUID processId = new UUID(data.getLong(), data.getLong());
-            // Decode previously active tasks
-            Set<TaskId> prevTasks = new HashSet<>();
-            int numPrevs = data.getInt();
-            for (int i = 0; i < numPrevs; i++) {
-                TaskId id = TaskId.readFrom(data);
-                prevTasks.add(id);
-            }
-            // Decode previously cached tasks
-            Set<TaskId> standbyTasks = new HashSet<>();
-            int numCached = data.getInt();
-            for (int i = 0; i < numCached; i++) {
-                standbyTasks.add(TaskId.readFrom(data));
-            }
-
-            String userEndPoint = null;
-            if (version == CURRENT_VERSION) {
-                int bytesLength = data.getInt();
-                if (bytesLength != 0) {
-                    byte[] bytes = new byte[bytesLength];
-                    data.get(bytes);
-                    userEndPoint = new String(bytes, Charset.forName("UTF-8"));
-                }
-
-            }
-            return new SubscriptionInfo(version, processId, prevTasks, 
standbyTasks, userEndPoint);
+        // decode used version
+        final int usedVersion = data.getInt();
+        final SubscriptionInfo subscriptionInfo = new 
SubscriptionInfo(usedVersion);
 
-        } else {
-            TaskAssignmentException ex = new TaskAssignmentException("unable 
to decode subscription data: version=" + version);
-            log.error(ex.getMessage(), ex);
-            throw ex;
+        switch (usedVersion) {
+            case 1:
+                decodeVersionOneData(subscriptionInfo, data);
+                break;
+            case 2:
+                decodeVersionTwoData(subscriptionInfo, data);
+                break;
+            default:
+                TaskAssignmentException fatalException = new 
TaskAssignmentException("Unable to decode subscription data: " +
+                    "used version: " + usedVersion + "; latest supported 
version: " + LATEST_SUPPORTED_VERSION);
+                log.error(fatalException.getMessage(), fatalException);
+                throw fatalException;
+        }
+
+        return subscriptionInfo;
+    }
+
+    private static void decodeVersionOneData(final SubscriptionInfo 
subscriptionInfo,
+                                             final ByteBuffer data) {
+        // decode client UUID
+        subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+
+        // decode previously active tasks
+        final int numPrevs = data.getInt();
+        subscriptionInfo.prevTasks = new HashSet<>();
+        for (int i = 0; i < numPrevs; i++) {
+            TaskId id = TaskId.readFrom(data, 1);
+            subscriptionInfo.prevTasks.add(id);
+        }
+
+        // decode previously cached tasks
+        final int numCached = data.getInt();
+        subscriptionInfo.standbyTasks = new HashSet<>();
+        for (int i = 0; i < numCached; i++) {
+            subscriptionInfo.standbyTasks.add(TaskId.readFrom(data, 1));
+        }
+    }
+
+    private static void decodeVersionTwoData(final SubscriptionInfo 
subscriptionInfo,
+                                             final ByteBuffer data) {
+        // decode client UUID
+        subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+
+        // decode previously active tasks
+        final int numPrevs = data.getInt();
+        subscriptionInfo.prevTasks = new HashSet<>();
+        for (int i = 0; i < numPrevs; i++) {
+            TaskId id = TaskId.readFrom(data, 2);
+            subscriptionInfo.prevTasks.add(id);
+        }
+
+        // decode previously cached tasks
+        final int numCached = data.getInt();
+        subscriptionInfo.standbyTasks = new HashSet<>();
+        for (int i = 0; i < numCached; i++) {
+            subscriptionInfo.standbyTasks.add(TaskId.readFrom(data, 2));
+        }
+
+        // decode user end point (can be null)
+        int bytesLength = data.getInt();
+        if (bytesLength != 0) {
+            final byte[] bytes = new byte[bytesLength];
+            data.get(bytes);
+            subscriptionInfo.userEndPoint = new String(bytes, 
Charset.forName("UTF-8"));
         }
     }
 
     @Override
     public int hashCode() {
-        int hashCode = version ^ processId.hashCode() ^ prevTasks.hashCode() ^ 
standbyTasks.hashCode();
+        final int hashCode = usedVersion ^ processId.hashCode() ^ 
prevTasks.hashCode() ^ standbyTasks.hashCode();
         if (userEndPoint == null) {
             return hashCode;
         }
@@ -141,10 +267,10 @@ public int hashCode() {
     }
 
     @Override
-    public boolean equals(Object o) {
+    public boolean equals(final Object o) {
         if (o instanceof SubscriptionInfo) {
-            SubscriptionInfo other = (SubscriptionInfo) o;
-            return this.version == other.version &&
+            final SubscriptionInfo other = (SubscriptionInfo) o;
+            return this.usedVersion == other.usedVersion &&
                     this.processId.equals(other.processId) &&
                     this.prevTasks.equals(other.prevTasks) &&
                     this.standbyTasks.equals(other.standbyTasks) &&
@@ -154,4 +280,4 @@ public boolean equals(Object o) {
         }
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 02ab803735a..1058f5ea9b0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -847,7 +847,7 @@ public void shouldAddUserDefinedEndPointToSubscription() 
throws Exception {
         
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
 (Object) userEndPoint));
         final PartitionAssignor.Subscription subscription = 
partitionAssignor.subscription(Utils.mkSet("input"));
         final SubscriptionInfo subscriptionInfo = 
SubscriptionInfo.decode(subscription.userData());
-        assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
+        assertEquals("localhost:8080", subscriptionInfo.userEndPoint());
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 9c011bb0cae..ceb0161dc22 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -65,10 +65,10 @@ public void shouldBeBackwardCompatible() {
 
         final ByteBuffer v1Encoding = encodePreviousVersion(processId, 
activeTasks, standbyTasks);
         final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
-        assertEquals(activeTasks, decode.prevTasks);
-        assertEquals(standbyTasks, decode.standbyTasks);
-        assertEquals(processId, decode.processId);
-        assertNull(decode.userEndPoint);
+        assertEquals(activeTasks, decode.prevTasks());
+        assertEquals(standbyTasks, decode.standbyTasks());
+        assertEquals(processId, decode.processId());
+        assertNull(decode.userEndPoint());
 
     }
 
@@ -87,12 +87,12 @@ private ByteBuffer encodePreviousVersion(UUID processId,  
Set<TaskId> prevTasks,
         // encode ids of previously running tasks
         buf.putInt(prevTasks.size());
         for (TaskId id : prevTasks) {
-            id.writeTo(buf);
+            id.writeTo(buf, 1);
         }
         // encode ids of cached tasks
         buf.putInt(standbyTasks.size());
         for (TaskId id : standbyTasks) {
-            id.writeTo(buf);
+            id.writeTo(buf, 1);
         }
         buf.rewind();
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams standby task assignment should be state-store aware
> -----------------------------------------------------------
>
>                 Key: KAFKA-4696
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4696
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.11.0.0
>            Reporter: Damian Guy
>            Assignee: Richard Yu
>            Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to