[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386548#comment-16386548
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---------------------------------------

mjsax closed pull request #4630: KAFKA-6054: Code cleanup to prepare the actual 
fix for an upgrade path
URL: https://github.com/apache/kafka/pull/4630
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6b3626101bd..47becfc239b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -226,11 +226,11 @@
     public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = 
"default.key.serde";
     private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default 
serializer / deserializer class for key that implements the 
<code>org.apache.kafka.common.serialization.Serde</code> interface.";
 
-    /** {@code default timestamp.extractor} */
+    /** {@code default.timestamp.extractor} */
     public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = 
"default.timestamp.extractor";
     private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = 
"Default timestamp extractor class that implements the 
<code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
 
-    /** {@code default value.serde} */
+    /** {@code default.value.serde} */
     public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = 
"default.value.serde";
     private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default 
serializer / deserializer class for value that implements the 
<code>org.apache.kafka.common.serialization.Serde</code> interface.";
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 9aa0e94c8c1..71a84b2ca73 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -66,7 +66,8 @@
         public final TaskId taskId;
         public final TopicPartition partition;
 
-        AssignedPartition(final TaskId taskId, final TopicPartition partition) 
{
+        AssignedPartition(final TaskId taskId,
+                          final TopicPartition partition) {
             this.taskId = taskId;
             this.partition = partition;
         }
@@ -77,11 +78,11 @@ public int compareTo(final AssignedPartition that) {
         }
 
         @Override
-        public boolean equals(Object o) {
+        public boolean equals(final Object o) {
             if (!(o instanceof AssignedPartition)) {
                 return false;
             }
-            AssignedPartition other = (AssignedPartition) o;
+            final AssignedPartition other = (AssignedPartition) o;
             return compareTo(other) == 0;
         }
 
@@ -104,8 +105,9 @@ public int hashCode() {
                 final String host = getHost(endPoint);
                 final Integer port = getPort(endPoint);
 
-                if (host == null || port == null)
+                if (host == null || port == null) {
                     throw new ConfigException(String.format("Error parsing 
host address %s. Expected format host:port.", endPoint));
+                }
 
                 hostInfo = new HostInfo(host, port);
             } else {
@@ -119,10 +121,11 @@ public int hashCode() {
             state = new ClientState();
         }
 
-        void addConsumer(final String consumerMemberId, final SubscriptionInfo 
info) {
+        void addConsumer(final String consumerMemberId,
+                         final SubscriptionInfo info) {
             consumers.add(consumerMemberId);
-            state.addPreviousActiveTasks(info.prevTasks);
-            state.addPreviousStandbyTasks(info.standbyTasks);
+            state.addPreviousActiveTasks(info.prevTasks());
+            state.addPreviousStandbyTasks(info.standbyTasks());
             state.incrementCapacity();
         }
 
@@ -157,8 +160,9 @@ public String toString() {
 
     private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new 
Comparator<TopicPartition>() {
         @Override
-        public int compare(TopicPartition p1, TopicPartition p2) {
-            int result = p1.topic().compareTo(p2.topic());
+        public int compare(final TopicPartition p1,
+                           final TopicPartition p2) {
+            final int result = p1.topic().compareTo(p2.topic());
 
             if (result != 0) {
                 return result;
@@ -194,15 +198,15 @@ public void configure(final Map<String, ?> configs) {
 
         final Object o = 
configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
         if (o == null) {
-            KafkaException ex = new KafkaException("TaskManager is not 
specified");
-            log.error(ex.getMessage(), ex);
-            throw ex;
+            final KafkaException fatalException = new 
KafkaException("TaskManager is not specified");
+            log.error(fatalException.getMessage(), fatalException);
+            throw fatalException;
         }
 
         if (!(o instanceof TaskManager)) {
-            KafkaException ex = new KafkaException(String.format("%s is not an 
instance of %s", o.getClass().getName(), TaskManager.class.getName()));
-            log.error(ex.getMessage(), ex);
-            throw ex;
+            final KafkaException fatalException = new 
KafkaException(String.format("%s is not an instance of %s", 
o.getClass().getName(), TaskManager.class.getName()));
+            log.error(fatalException.getMessage(), fatalException);
+            throw fatalException;
         }
 
         taskManager = (TaskManager) o;
@@ -214,14 +218,14 @@ public void configure(final Map<String, ?> configs) {
         final String userEndPoint = 
streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG);
         if (userEndPoint != null && !userEndPoint.isEmpty()) {
             try {
-                String host = getHost(userEndPoint);
-                Integer port = getPort(userEndPoint);
+                final String host = getHost(userEndPoint);
+                final Integer port = getPort(userEndPoint);
 
                 if (host == null || port == null)
                     throw new ConfigException(String.format("%s Config %s 
isn't in the correct format. Expected a host:port pair" +
                                     " but received %s",
                             logPrefix, 
StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
-            } catch (NumberFormatException nfe) {
+            } catch (final NumberFormatException nfe) {
                 throw new ConfigException(String.format("%s Invalid port 
supplied in %s for config %s",
                         logPrefix, userEndPoint, 
StreamsConfig.APPLICATION_SERVER_CONFIG));
             }
@@ -240,7 +244,7 @@ public String name() {
     }
 
     @Override
-    public Subscription subscription(Set<String> topics) {
+    public Subscription subscription(final Set<String> topics) {
         // Adds the following information to subscription
         // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
         // 2. Task ids of previously running tasks
@@ -249,7 +253,11 @@ public Subscription subscription(Set<String> topics) {
         final Set<TaskId> previousActiveTasks = 
taskManager.prevActiveTaskIds();
         final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
         standbyTasks.removeAll(previousActiveTasks);
-        final SubscriptionInfo data = new 
SubscriptionInfo(taskManager.processId(), previousActiveTasks, standbyTasks, 
this.userEndPoint);
+        final SubscriptionInfo data = new SubscriptionInfo(
+            taskManager.processId(),
+            previousActiveTasks,
+            standbyTasks,
+            this.userEndPoint);
 
         taskManager.updateSubscriptionsFromMetadata(topics);
 
@@ -277,22 +285,32 @@ public Subscription subscription(Set<String> topics) {
      * 3. within each client, tasks are assigned to consumer clients in 
round-robin manner.
      */
     @Override
-    public Map<String, Assignment> assign(Cluster metadata, Map<String, 
Subscription> subscriptions) {
+    public Map<String, Assignment> assign(final Cluster metadata,
+                                          final Map<String, Subscription> 
subscriptions) {
         // construct the client metadata from the decoded subscription info
-        Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
-
-        for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) 
{
-            String consumerId = entry.getKey();
-            Subscription subscription = entry.getValue();
-
-            SubscriptionInfo info = 
SubscriptionInfo.decode(subscription.userData());
+        final Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
+
+        int minUserMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+        for (final Map.Entry<String, Subscription> entry : 
subscriptions.entrySet()) {
+            final String consumerId = entry.getKey();
+            final Subscription subscription = entry.getValue();
+
+            final SubscriptionInfo info = 
SubscriptionInfo.decode(subscription.userData());
+            final int usedVersion = info.version();
+            if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+                throw new IllegalStateException("Unknown metadata version: " + 
usedVersion
+                    + "; latest supported version: " + 
SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+            }
+            if (usedVersion < minUserMetadataVersion) {
+                minUserMetadataVersion = usedVersion;
+            }
 
             // create the new client metadata if necessary
-            ClientMetadata clientMetadata = 
clientsMetadata.get(info.processId);
+            ClientMetadata clientMetadata = 
clientsMetadata.get(info.processId());
 
             if (clientMetadata == null) {
-                clientMetadata = new ClientMetadata(info.userEndPoint);
-                clientsMetadata.put(info.processId, clientMetadata);
+                clientMetadata = new ClientMetadata(info.userEndPoint());
+                clientsMetadata.put(info.processId(), clientMetadata);
             }
 
             // add the consumer to the client
@@ -309,8 +327,8 @@ public Subscription subscription(Set<String> topics) {
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = 
taskManager.builder().topicGroups();
 
         final Map<String, InternalTopicMetadata> repartitionTopicMetadata = 
new HashMap<>();
-        for (InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
-            for (InternalTopicConfig topic: 
topicsInfo.repartitionSourceTopics.values()) {
+        for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
+            for (final InternalTopicConfig topic: 
topicsInfo.repartitionSourceTopics.values()) {
                 repartitionTopicMetadata.put(topic.name(), new 
InternalTopicMetadata(topic));
             }
         }
@@ -319,13 +337,13 @@ public Subscription subscription(Set<String> topics) {
         do {
             numPartitionsNeeded = false;
 
-            for (InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
-                for (String topicName : 
topicsInfo.repartitionSourceTopics.keySet()) {
+            for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
+                for (final String topicName : 
topicsInfo.repartitionSourceTopics.keySet()) {
                     int numPartitions = 
repartitionTopicMetadata.get(topicName).numPartitions;
 
                     // try set the number of partitions for this repartition 
topic if it is not set yet
                     if (numPartitions == UNKNOWN) {
-                        for (InternalTopologyBuilder.TopicsInfo 
otherTopicsInfo : topicGroups.values()) {
+                        for (final InternalTopologyBuilder.TopicsInfo 
otherTopicsInfo : topicGroups.values()) {
                             final Set<String> otherSinkTopics = 
otherTopicsInfo.sinkTopics;
 
                             if (otherSinkTopics.contains(topicName)) {
@@ -375,7 +393,7 @@ public Subscription subscription(Set<String> topics) {
         // augment the metadata with the newly computed number of partitions 
for all the
         // repartition source topics
         final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions 
= new HashMap<>();
-        for (Map.Entry<String, InternalTopicMetadata> entry : 
repartitionTopicMetadata.entrySet()) {
+        for (final Map.Entry<String, InternalTopicMetadata> entry : 
repartitionTopicMetadata.entrySet()) {
             final String topic = entry.getKey();
             final int numPartitions = entry.getValue().numPartitions;
 
@@ -395,7 +413,7 @@ public Subscription subscription(Set<String> topics) {
         // get the tasks as partition groups from the partition grouper
         final Set<String> allSourceTopics = new HashSet<>();
         final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
-        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : 
topicGroups.entrySet()) {
+        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> 
entry : topicGroups.entrySet()) {
             allSourceTopics.addAll(entry.getValue().sourceTopics);
             sourceTopicsByGroup.put(entry.getKey(), 
entry.getValue().sourceTopics);
         }
@@ -405,9 +423,9 @@ public Subscription subscription(Set<String> topics) {
         // check if all partitions are assigned, and there are no duplicates 
of partitions in multiple tasks
         final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
         final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : 
partitionsForTask.entrySet()) {
+        for (final Map.Entry<TaskId, Set<TopicPartition>> entry : 
partitionsForTask.entrySet()) {
             final Set<TopicPartition> partitions = entry.getValue();
-            for (TopicPartition partition : partitions) {
+            for (final TopicPartition partition : partitions) {
                 if (allAssignedPartitions.contains(partition)) {
                     log.warn("Partition {} is assigned to more than one tasks: 
{}", partition, partitionsForTask);
                 }
@@ -422,10 +440,10 @@ public Subscription subscription(Set<String> topics) {
             }
             ids.add(id);
         }
-        for (String topic : allSourceTopics) {
+        for (final String topic : allSourceTopics) {
             final List<PartitionInfo> partitionInfoList = 
fullMetadata.partitionsForTopic(topic);
             if (!partitionInfoList.isEmpty()) {
-                for (PartitionInfo partitionInfo : partitionInfoList) {
+                for (final PartitionInfo partitionInfo : partitionInfoList) {
                     final TopicPartition partition = new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                     if (!allAssignedPartitions.contains(partition)) {
                         log.warn("Partition {} is not assigned to any tasks: 
{}", partition, partitionsForTask);
@@ -438,15 +456,15 @@ public Subscription subscription(Set<String> topics) {
 
         // add tasks to state change log topic subscribers
         final Map<String, InternalTopicMetadata> changelogTopicMetadata = new 
HashMap<>();
-        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : 
topicGroups.entrySet()) {
+        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> 
entry : topicGroups.entrySet()) {
             final int topicGroupId = entry.getKey();
             final Map<String, InternalTopicConfig> stateChangelogTopics = 
entry.getValue().stateChangelogTopics;
 
-            for (InternalTopicConfig topicConfig : 
stateChangelogTopics.values()) {
+            for (final InternalTopicConfig topicConfig : 
stateChangelogTopics.values()) {
                 // the expected number of partitions is the max value of 
TaskId.partition + 1
                 int numPartitions = UNKNOWN;
                 if (tasksByTopicGroup.get(topicGroupId) != null) {
-                    for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
+                    for (final TaskId task : 
tasksByTopicGroup.get(topicGroupId)) {
                         if (numPartitions < task.partition + 1)
                             numPartitions = task.partition + 1;
                     }
@@ -468,7 +486,7 @@ public Subscription subscription(Set<String> topics) {
 
         // assign tasks to clients
         final Map<UUID, ClientState> states = new HashMap<>();
-        for (Map.Entry<UUID, ClientMetadata> entry : 
clientsMetadata.entrySet()) {
+        for (final Map.Entry<UUID, ClientMetadata> entry : 
clientsMetadata.entrySet()) {
             states.put(entry.getKey(), entry.getValue().state);
         }
 
@@ -484,25 +502,27 @@ public Subscription subscription(Set<String> topics) {
 
         // construct the global partition assignment per host map
         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new 
HashMap<>();
-        for (Map.Entry<UUID, ClientMetadata> entry : 
clientsMetadata.entrySet()) {
-            final HostInfo hostInfo = entry.getValue().hostInfo;
+        if (minUserMetadataVersion == 2) {
+            for (final Map.Entry<UUID, ClientMetadata> entry : 
clientsMetadata.entrySet()) {
+                final HostInfo hostInfo = entry.getValue().hostInfo;
 
-            if (hostInfo != null) {
-                final Set<TopicPartition> topicPartitions = new HashSet<>();
-                final ClientState state = entry.getValue().state;
+                if (hostInfo != null) {
+                    final Set<TopicPartition> topicPartitions = new 
HashSet<>();
+                    final ClientState state = entry.getValue().state;
 
-                for (final TaskId id : state.activeTasks()) {
-                    topicPartitions.addAll(partitionsForTask.get(id));
-                }
+                    for (final TaskId id : state.activeTasks()) {
+                        topicPartitions.addAll(partitionsForTask.get(id));
+                    }
 
-                partitionsByHostState.put(hostInfo, topicPartitions);
+                    partitionsByHostState.put(hostInfo, topicPartitions);
+                }
             }
         }
         taskManager.setPartitionsByHostState(partitionsByHostState);
 
         // within the client, distribute tasks to its owned consumers
         final Map<String, Assignment> assignment = new HashMap<>();
-        for (Map.Entry<UUID, ClientMetadata> entry : 
clientsMetadata.entrySet()) {
+        for (final Map.Entry<UUID, ClientMetadata> entry : 
clientsMetadata.entrySet()) {
             final Set<String> consumers = entry.getValue().consumers;
             final ClientState state = entry.getValue().state;
 
@@ -511,7 +531,7 @@ public Subscription subscription(Set<String> topics) {
 
             int consumerTaskIndex = 0;
 
-            for (String consumer : consumers) {
+            for (final String consumer : consumers) {
                 final Map<TaskId, Set<TopicPartition>> standby = new 
HashMap<>();
                 final ArrayList<AssignedPartition> assignedPartitions = new 
ArrayList<>();
 
@@ -540,13 +560,15 @@ public Subscription subscription(Set<String> topics) {
                 Collections.sort(assignedPartitions);
                 final List<TaskId> active = new ArrayList<>();
                 final List<TopicPartition> activePartitions = new 
ArrayList<>();
-                for (AssignedPartition partition : assignedPartitions) {
+                for (final AssignedPartition partition : assignedPartitions) {
                     active.add(partition.taskId);
                     activePartitions.add(partition.partition);
                 }
 
                 // finally, encode the assignment before sending back to 
coordinator
-                assignment.put(consumer, new Assignment(activePartitions, new 
AssignmentInfo(active, standby, partitionsByHostState).encode()));
+                assignment.put(consumer, new Assignment(
+                    activePartitions,
+                    new AssignmentInfo(minUserMetadataVersion, active, 
standby, partitionsByHostState).encode()));
             }
         }
 
@@ -577,26 +599,54 @@ public Subscription subscription(Set<String> topics) {
      * @throws TaskAssignmentException if there is no task id for one of the 
partitions specified
      */
     @Override
-    public void onAssignment(Assignment assignment) {
-        List<TopicPartition> partitions = new 
ArrayList<>(assignment.partitions());
+    public void onAssignment(final Assignment assignment) {
+        final List<TopicPartition> partitions = new 
ArrayList<>(assignment.partitions());
         Collections.sort(partitions, PARTITION_COMPARATOR);
 
-        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+        final AssignmentInfo info = 
AssignmentInfo.decode(assignment.userData());
+        final int usedVersion = info.version();
 
-        Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        // version 1 field
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        // version 2 fields
+        final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new 
HashMap<>();
+        final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
+
+        switch (usedVersion) {
+            case 1:
+                processVersionOneAssignment(info, partitions, activeTasks);
+                partitionsByHost = Collections.emptyMap();
+                break;
+            case 2:
+                processVersionTwoAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
+                partitionsByHost = info.partitionsByHost();
+                break;
+            default:
+                throw new IllegalStateException("Unknown metadata version: " + 
usedVersion
+                    + "; latest supported version: " + 
AssignmentInfo.LATEST_SUPPORTED_VERSION);
+        }
 
+        
taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
+        taskManager.setPartitionsByHostState(partitionsByHost);
+        taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
+        taskManager.updateSubscriptionsFromAssignment(partitions);
+    }
+
+    private void processVersionOneAssignment(final AssignmentInfo info,
+                                             final List<TopicPartition> 
partitions,
+                                             final Map<TaskId, 
Set<TopicPartition>> activeTasks) {
         // the number of assigned partitions should be the same as number of 
active tasks, which
         // could be duplicated if one task has more than one assigned 
partitions
-        if (partitions.size() != info.activeTasks.size()) {
+        if (partitions.size() != info.activeTasks().size()) {
             throw new TaskAssignmentException(
-                    String.format("%sNumber of assigned partitions %d is not 
equal to the number of active taskIds %d" +
-                            ", assignmentInfo=%s", logPrefix, 
partitions.size(), info.activeTasks.size(), info.toString())
+                String.format("%sNumber of assigned partitions %d is not equal 
to the number of active taskIds %d" +
+                    ", assignmentInfo=%s", logPrefix, partitions.size(), 
info.activeTasks().size(), info.toString())
             );
         }
 
         for (int i = 0; i < partitions.size(); i++) {
-            TopicPartition partition = partitions.get(i);
-            TaskId id = info.activeTasks.get(i);
+            final TopicPartition partition = partitions.get(i);
+            final TaskId id = info.activeTasks().get(i);
 
             Set<TopicPartition> assignedPartitions = activeTasks.get(id);
             if (assignedPartitions == null) {
@@ -605,23 +655,23 @@ public void onAssignment(Assignment assignment) {
             }
             assignedPartitions.add(partition);
         }
+    }
 
-        final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new 
HashMap<>();
-        for (Set<TopicPartition> value : info.partitionsByHost.values()) {
-            for (TopicPartition topicPartition : value) {
-                topicToPartitionInfo.put(topicPartition, new 
PartitionInfo(topicPartition.topic(),
-                        topicPartition.partition(),
-                        null,
-                        new Node[0],
-                        new Node[0]));
+    private void processVersionTwoAssignment(final AssignmentInfo info,
+                                             final List<TopicPartition> 
partitions,
+                                             final Map<TaskId, 
Set<TopicPartition>> activeTasks,
+                                             final Map<TopicPartition, 
PartitionInfo> topicToPartitionInfo) {
+        processVersionOneAssignment(info, partitions, activeTasks);
+
+        // process partitions by host
+        final Map<HostInfo, Set<TopicPartition>> partitionsByHost = 
info.partitionsByHost();
+        for (final Set<TopicPartition> value : partitionsByHost.values()) {
+            for (final TopicPartition topicPartition : value) {
+                topicToPartitionInfo.put(
+                    topicPartition,
+                    new PartitionInfo(topicPartition.topic(), 
topicPartition.partition(), null, new Node[0], new Node[0]));
             }
         }
-
-        
taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
-        taskManager.setPartitionsByHostState(info.partitionsByHost);
-        taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks);
-
-        taskManager.updateSubscriptionsFromAssignment(partitions);
     }
 
     /**
@@ -658,10 +708,10 @@ private void prepareTopic(final Map<String, 
InternalTopicMetadata> topicPartitio
         log.debug("Completed validating internal topics in partition 
assignor.");
     }
 
-    private void ensureCopartitioning(Collection<Set<String>> 
copartitionGroups,
-                                      Map<String, InternalTopicMetadata> 
allRepartitionTopicsNumPartitions,
-                                      Cluster metadata) {
-        for (Set<String> copartitionGroup : copartitionGroups) {
+    private void ensureCopartitioning(final Collection<Set<String>> 
copartitionGroups,
+                                      final Map<String, InternalTopicMetadata> 
allRepartitionTopicsNumPartitions,
+                                      final Cluster metadata) {
+        for (final Set<String> copartitionGroup : copartitionGroups) {
             copartitionedTopicsValidator.validate(copartitionGroup, 
allRepartitionTopicsNumPartitions, metadata);
         }
     }
@@ -677,7 +727,7 @@ private void ensureCopartitioning(Collection<Set<String>> 
copartitionGroups,
 
         private final Set<String> updatedTopicSubscriptions = new HashSet<>();
 
-        public void updateTopics(Collection<String> topicNames) {
+        public void updateTopics(final Collection<String> topicNames) {
             updatedTopicSubscriptions.clear();
             updatedTopicSubscriptions.addAll(topicNames);
         }
@@ -735,7 +785,7 @@ void validate(final Set<String> copartitionGroup,
             // if all topics for this co-partition group is repartition topics,
             // then set the number of partitions to be the maximum of the 
number of partitions.
             if (numPartitions == UNKNOWN) {
-                for (Map.Entry<String, InternalTopicMetadata> entry: 
allRepartitionTopicsNumPartitions.entrySet()) {
+                for (final Map.Entry<String, InternalTopicMetadata> entry: 
allRepartitionTopicsNumPartitions.entrySet()) {
                     if (copartitionGroup.contains(entry.getKey())) {
                         final int partitions = entry.getValue().numPartitions;
                         if (partitions > numPartitions) {
@@ -745,7 +795,7 @@ void validate(final Set<String> copartitionGroup,
                 }
             }
             // enforce co-partitioning restrictions to repartition topics by 
updating their number of partitions
-            for (Map.Entry<String, InternalTopicMetadata> entry : 
allRepartitionTopicsNumPartitions.entrySet()) {
+            for (final Map.Entry<String, InternalTopicMetadata> entry : 
allRepartitionTopicsNumPartitions.entrySet()) {
                 if (copartitionGroup.contains(entry.getKey())) {
                     entry.getValue().numPartitions = numPartitions;
                 }
@@ -755,7 +805,7 @@ void validate(final Set<String> copartitionGroup,
     }
 
     // following functions are for test only
-    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
+    void setInternalTopicManager(final InternalTopicManager 
internalTopicManager) {
         this.internalTopicManager = internalTopicManager;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 8607472c281..c8df7498755 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -39,76 +39,123 @@
 public class AssignmentInfo {
 
     private static final Logger log = 
LoggerFactory.getLogger(AssignmentInfo.class);
-    /**
-     * A new field was added, partitionsByHost. CURRENT_VERSION
-     * is required so we can decode the previous version. For example, this 
may occur
-     * during a rolling upgrade
-     */
-    private static final int CURRENT_VERSION = 2;
-    public final int version;
-    public final List<TaskId> activeTasks; // each element corresponds to a 
partition
-    public final Map<TaskId, Set<TopicPartition>> standbyTasks;
-    public final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
 
-    public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, 
Set<TopicPartition>> standbyTasks,
-                          Map<HostInfo, Set<TopicPartition>> hostState) {
-        this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
+    public static final int LATEST_SUPPORTED_VERSION = 2;
+
+    private final int usedVersion;
+    private List<TaskId> activeTasks;
+    private Map<TaskId, Set<TopicPartition>> standbyTasks;
+    private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
+
+    private AssignmentInfo(final int version) {
+        this.usedVersion = version;
+    }
+
+    public AssignmentInfo(final List<TaskId> activeTasks,
+                          final Map<TaskId, Set<TopicPartition>> standbyTasks,
+                          final Map<HostInfo, Set<TopicPartition>> hostState) {
+        this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
     }
 
-    protected AssignmentInfo(int version, List<TaskId> activeTasks, 
Map<TaskId, Set<TopicPartition>> standbyTasks,
-                             Map<HostInfo, Set<TopicPartition>> hostState) {
-        this.version = version;
+    public AssignmentInfo(final int version,
+                          final List<TaskId> activeTasks,
+                          final Map<TaskId, Set<TopicPartition>> standbyTasks,
+                          final Map<HostInfo, Set<TopicPartition>> hostState) {
+        this.usedVersion = version;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
         this.partitionsByHost = hostState;
     }
 
+    public int version() {
+        return usedVersion;
+    }
+
+    public List<TaskId> activeTasks() {
+        return activeTasks;
+    }
+
+    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
+        return standbyTasks;
+    }
+
+    public Map<HostInfo, Set<TopicPartition>> partitionsByHost() {
+        return partitionsByHost;
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to encode the data, 
e.g., if there is an
      * IO exception during encoding
      */
     public ByteBuffer encode() {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(baos);
-
-        try {
-            // Encode version
-            out.writeInt(version);
-            // Encode active tasks
-            out.writeInt(activeTasks.size());
-            for (TaskId id : activeTasks) {
-                id.writeTo(out);
-            }
-            // Encode standby tasks
-            out.writeInt(standbyTasks.size());
-            for (Map.Entry<TaskId, Set<TopicPartition>> entry : 
standbyTasks.entrySet()) {
-                TaskId id = entry.getKey();
-                id.writeTo(out);
-
-                Set<TopicPartition> partitions = entry.getValue();
-                writeTopicPartitions(out, partitions);
-            }
-            out.writeInt(partitionsByHost.size());
-            for (Map.Entry<HostInfo, Set<TopicPartition>> entry : 
partitionsByHost
-                    .entrySet()) {
-                final HostInfo hostInfo = entry.getKey();
-                out.writeUTF(hostInfo.host());
-                out.writeInt(hostInfo.port());
-                writeTopicPartitions(out, entry.getValue());
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        try (final DataOutputStream out = new DataOutputStream(baos)) {
+            switch (usedVersion) {
+                case 1:
+                    encodeVersionOne(out);
+                    break;
+                case 2:
+                    encodeVersionTwo(out);
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown metadata version: 
" + usedVersion
+                        + "; latest supported version: " + 
LATEST_SUPPORTED_VERSION);
             }
 
             out.flush();
             out.close();
 
             return ByteBuffer.wrap(baos.toByteArray());
-        } catch (IOException ex) {
+        } catch (final IOException ex) {
             throw new TaskAssignmentException("Failed to encode 
AssignmentInfo", ex);
         }
     }
 
-    private void writeTopicPartitions(DataOutputStream out, 
Set<TopicPartition> partitions) throws IOException {
+    private void encodeVersionOne(final DataOutputStream out) throws 
IOException {
+        out.writeInt(1); // version
+        encodeActiveAndStandbyTaskAssignment(out);
+    }
+
+    private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream 
out) throws IOException {
+        // encode active tasks
+        out.writeInt(activeTasks.size());
+        for (final TaskId id : activeTasks) {
+            id.writeTo(out);
+        }
+
+        // encode standby tasks
+        out.writeInt(standbyTasks.size());
+        for (final Map.Entry<TaskId, Set<TopicPartition>> entry : 
standbyTasks.entrySet()) {
+            final TaskId id = entry.getKey();
+            id.writeTo(out);
+
+            final Set<TopicPartition> partitions = entry.getValue();
+            writeTopicPartitions(out, partitions);
+        }
+    }
+
+    private void encodeVersionTwo(final DataOutputStream out) throws 
IOException {
+        out.writeInt(2); // version
+        encodeActiveAndStandbyTaskAssignment(out);
+        encodePartitionsByHost(out);
+    }
+
+    private void encodePartitionsByHost(final DataOutputStream out) throws 
IOException {
+        // encode partitions by host
+        out.writeInt(partitionsByHost.size());
+        for (final Map.Entry<HostInfo, Set<TopicPartition>> entry : 
partitionsByHost.entrySet()) {
+            final HostInfo hostInfo = entry.getKey();
+            out.writeUTF(hostInfo.host());
+            out.writeInt(hostInfo.port());
+            writeTopicPartitions(out, entry.getValue());
+        }
+    }
+
+    private void writeTopicPartitions(final DataOutputStream out,
+                                      final Set<TopicPartition> partitions) 
throws IOException {
         out.writeInt(partitions.size());
-        for (TopicPartition partition : partitions) {
+        for (final TopicPartition partition : partitions) {
             out.writeUTF(partition.topic());
             out.writeInt(partition.partition());
         }
@@ -117,52 +164,69 @@ private void writeTopicPartitions(DataOutputStream out, 
Set<TopicPartition> part
     /**
      * @throws TaskAssignmentException if method fails to decode the data or 
if the data version is unknown
      */
-    public static AssignmentInfo decode(ByteBuffer data) {
+    public static AssignmentInfo decode(final ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
 
-        try (DataInputStream in = new DataInputStream(new 
ByteBufferInputStream(data))) {
-            // Decode version
-            int version = in.readInt();
-            if (version < 0 || version > CURRENT_VERSION) {
-                TaskAssignmentException ex = new 
TaskAssignmentException("Unknown assignment data version: " + version);
-                log.error(ex.getMessage(), ex);
-                throw ex;
-            }
+        try (final DataInputStream in = new DataInputStream(new 
ByteBufferInputStream(data))) {
+            // decode used version
+            final int usedVersion = in.readInt();
+            final AssignmentInfo assignmentInfo = new 
AssignmentInfo(usedVersion);
 
-            // Decode active tasks
-            int count = in.readInt();
-            List<TaskId> activeTasks = new ArrayList<>(count);
-            for (int i = 0; i < count; i++) {
-                activeTasks.add(TaskId.readFrom(in));
-            }
-            // Decode standby tasks
-            count = in.readInt();
-            Map<TaskId, Set<TopicPartition>> standbyTasks = new 
HashMap<>(count);
-            for (int i = 0; i < count; i++) {
-                TaskId id = TaskId.readFrom(in);
-                standbyTasks.put(id, readTopicPartitions(in));
+            switch (usedVersion) {
+                case 1:
+                    decodeVersionOneData(assignmentInfo, in);
+                    break;
+                case 2:
+                    decodeVersionTwoData(assignmentInfo, in);
+                    break;
+                default:
+                    TaskAssignmentException fatalException = new 
TaskAssignmentException("Unable to decode subscription data: " +
+                        "used version: " + usedVersion + "; latest supported 
version: " + LATEST_SUPPORTED_VERSION);
+                    log.error(fatalException.getMessage(), fatalException);
+                    throw fatalException;
             }
 
-            Map<HostInfo, Set<TopicPartition>> hostStateToTopicPartitions = 
new HashMap<>();
-            if (version == CURRENT_VERSION) {
-                int numEntries = in.readInt();
-                for (int i = 0; i < numEntries; i++) {
-                    HostInfo hostInfo = new HostInfo(in.readUTF(), 
in.readInt());
-                    hostStateToTopicPartitions.put(hostInfo, 
readTopicPartitions(in));
-                }
-            }
+            return assignmentInfo;
+        } catch (final IOException ex) {
+            throw new TaskAssignmentException("Failed to decode 
AssignmentInfo", ex);
+        }
+    }
+
+    private static void decodeVersionOneData(final AssignmentInfo 
assignmentInfo,
+                                             final DataInputStream in) throws 
IOException {
+        // decode active tasks
+        int count = in.readInt();
+        assignmentInfo.activeTasks = new ArrayList<>(count);
+        for (int i = 0; i < count; i++) {
+            assignmentInfo.activeTasks.add(TaskId.readFrom(in));
+        }
 
-            return new AssignmentInfo(activeTasks, standbyTasks, 
hostStateToTopicPartitions);
+        // decode standby tasks
+        count = in.readInt();
+        assignmentInfo.standbyTasks = new HashMap<>(count);
+        for (int i = 0; i < count; i++) {
+            TaskId id = TaskId.readFrom(in);
+            assignmentInfo.standbyTasks.put(id, readTopicPartitions(in));
+        }
+    }
 
-        } catch (IOException ex) {
-            throw new TaskAssignmentException("Failed to decode 
AssignmentInfo", ex);
+    private static void decodeVersionTwoData(final AssignmentInfo 
assignmentInfo,
+                                             final DataInputStream in) throws 
IOException {
+        decodeVersionOneData(assignmentInfo, in);
+
+        // decode partitions by host
+        assignmentInfo.partitionsByHost = new HashMap<>();
+        final int numEntries = in.readInt();
+        for (int i = 0; i < numEntries; i++) {
+            final HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
+            assignmentInfo.partitionsByHost.put(hostInfo, 
readTopicPartitions(in));
         }
     }
 
-    private static Set<TopicPartition> readTopicPartitions(DataInputStream in) 
throws IOException {
-        int numPartitions = in.readInt();
-        Set<TopicPartition> partitions = new HashSet<>(numPartitions);
+    private static Set<TopicPartition> readTopicPartitions(final 
DataInputStream in) throws IOException {
+        final int numPartitions = in.readInt();
+        final Set<TopicPartition> partitions = new HashSet<>(numPartitions);
         for (int j = 0; j < numPartitions; j++) {
             partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
         }
@@ -171,14 +235,14 @@ public static AssignmentInfo decode(ByteBuffer data) {
 
     @Override
     public int hashCode() {
-        return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ 
partitionsByHost.hashCode();
+        return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() 
^ partitionsByHost.hashCode();
     }
 
     @Override
-    public boolean equals(Object o) {
+    public boolean equals(final Object o) {
         if (o instanceof AssignmentInfo) {
-            AssignmentInfo other = (AssignmentInfo) o;
-            return this.version == other.version &&
+            final AssignmentInfo other = (AssignmentInfo) o;
+            return this.usedVersion == other.usedVersion &&
                     this.activeTasks.equals(other.activeTasks) &&
                     this.standbyTasks.equals(other.standbyTasks) &&
                     this.partitionsByHost.equals(other.partitionsByHost);
@@ -189,7 +253,7 @@ public boolean equals(Object o) {
 
     @Override
     public String toString() {
-        return "[version=" + version + ", active tasks=" + activeTasks.size() 
+ ", standby tasks=" + standbyTasks.size() + "]";
+        return "[version=" + usedVersion + ", active tasks=" + 
activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index f583dbafc94..7fee90b5402 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -31,42 +31,96 @@
 
     private static final Logger log = 
LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    private static final int CURRENT_VERSION = 2;
+    public static final int LATEST_SUPPORTED_VERSION = 2;
 
-    public final int version;
-    public final UUID processId;
-    public final Set<TaskId> prevTasks;
-    public final Set<TaskId> standbyTasks;
-    public final String userEndPoint;
+    private final int usedVersion;
+    private UUID processId;
+    private Set<TaskId> prevTasks;
+    private Set<TaskId> standbyTasks;
+    private String userEndPoint;
 
-    public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> 
standbyTasks, String userEndPoint) {
-        this(CURRENT_VERSION, processId, prevTasks, standbyTasks, 
userEndPoint);
+    private SubscriptionInfo(final int version) {
+        this.usedVersion = version;
     }
 
-    private SubscriptionInfo(int version, UUID processId, Set<TaskId> 
prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
-        this.version = version;
+    public SubscriptionInfo(final UUID processId,
+                            final Set<TaskId> prevTasks,
+                            final Set<TaskId> standbyTasks,
+                            final String userEndPoint) {
+        this(LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, 
userEndPoint);
+    }
+
+    public SubscriptionInfo(final int version,
+                            final UUID processId,
+                            final Set<TaskId> prevTasks,
+                            final Set<TaskId> standbyTasks,
+                            final String userEndPoint) {
+        this.usedVersion = version;
         this.processId = processId;
         this.prevTasks = prevTasks;
         this.standbyTasks = standbyTasks;
         this.userEndPoint = userEndPoint;
     }
 
+    public int version() {
+        return usedVersion;
+    }
+
+    public UUID processId() {
+        return processId;
+    }
+
+    public Set<TaskId> prevTasks() {
+        return prevTasks;
+    }
+
+    public Set<TaskId> standbyTasks() {
+        return standbyTasks;
+    }
+
+    public String userEndPoint() {
+        return userEndPoint;
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to encode the data
      */
     public ByteBuffer encode() {
-        byte[] endPointBytes;
-        if (userEndPoint == null) {
-            endPointBytes = new byte[0];
-        } else {
-            endPointBytes = userEndPoint.getBytes(Charset.forName("UTF-8"));
+        final ByteBuffer buf;
+
+        switch (usedVersion) {
+            case 1:
+                buf = encodeVersionOne();
+                break;
+            case 2:
+                buf = encodeVersionTwo(prepareUserEndPoint());
+                break;
+            default:
+                throw new IllegalStateException("Unknown metadata version: " + 
usedVersion
+                    + "; latest supported version: " + 
LATEST_SUPPORTED_VERSION);
         }
-        ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process 
id */ + 4 +
-                prevTasks.size() * 8 + 4 + standbyTasks.size() * 8
-                + 4 /* length of bytes */ + endPointBytes.length
-        );
-        // version
-        buf.putInt(version);
+
+        buf.rewind();
+        return buf;
+    }
+
+    private ByteBuffer encodeVersionOne() {
+        final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength());
+
+        buf.putInt(1); // version
+        encodeVersionOneData(buf);
+
+        return buf;
+    }
+
+    private int getVersionOneByteLength() {
+        return 4 + // version
+               16 + // client ID
+               4 + prevTasks.size() * 8 + // length + prev tasks
+               4 + standbyTasks.size() * 8; // length + standby tasks
+    }
+
+    private void encodeVersionOneData(final ByteBuffer buf) {
         // encode client UUID
         buf.putLong(processId.getMostSignificantBits());
         buf.putLong(processId.getLeastSignificantBits());
@@ -80,60 +134,104 @@ public ByteBuffer encode() {
         for (TaskId id : standbyTasks) {
             id.writeTo(buf);
         }
-        buf.putInt(endPointBytes.length);
-        buf.put(endPointBytes);
-        buf.rewind();
+    }
+
+    private byte[] prepareUserEndPoint() {
+        if (userEndPoint == null) {
+            return new byte[0];
+        } else {
+            return userEndPoint.getBytes(Charset.forName("UTF-8"));
+        }
+    }
+
+    private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) {
+        final ByteBuffer buf = 
ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes));
+
+        buf.putInt(2); // version
+        encodeVersionTwoData(buf, endPointBytes);
+
         return buf;
     }
 
+    private int getVersionTwoByteLength(final byte[] endPointBytes) {
+        return getVersionOneByteLength() +
+               4 + endPointBytes.length; // length + userEndPoint
+    }
+
+    private void encodeVersionTwoData(final ByteBuffer buf,
+                                      final byte[] endPointBytes) {
+        encodeVersionOneData(buf);
+        if (endPointBytes != null) {
+            buf.putInt(endPointBytes.length);
+            buf.put(endPointBytes);
+        }
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to decode the data
      */
-    public static SubscriptionInfo decode(ByteBuffer data) {
+    public static SubscriptionInfo decode(final ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
 
-        // Decode version
-        int version = data.getInt();
-        if (version == CURRENT_VERSION || version == 1) {
-            // Decode client UUID
-            UUID processId = new UUID(data.getLong(), data.getLong());
-            // Decode previously active tasks
-            Set<TaskId> prevTasks = new HashSet<>();
-            int numPrevs = data.getInt();
-            for (int i = 0; i < numPrevs; i++) {
-                TaskId id = TaskId.readFrom(data);
-                prevTasks.add(id);
-            }
-            // Decode previously cached tasks
-            Set<TaskId> standbyTasks = new HashSet<>();
-            int numCached = data.getInt();
-            for (int i = 0; i < numCached; i++) {
-                standbyTasks.add(TaskId.readFrom(data));
-            }
-
-            String userEndPoint = null;
-            if (version == CURRENT_VERSION) {
-                int bytesLength = data.getInt();
-                if (bytesLength != 0) {
-                    byte[] bytes = new byte[bytesLength];
-                    data.get(bytes);
-                    userEndPoint = new String(bytes, Charset.forName("UTF-8"));
-                }
-
-            }
-            return new SubscriptionInfo(version, processId, prevTasks, 
standbyTasks, userEndPoint);
+        // decode used version
+        final int usedVersion = data.getInt();
+        final SubscriptionInfo subscriptionInfo = new 
SubscriptionInfo(usedVersion);
+
+        switch (usedVersion) {
+            case 1:
+                decodeVersionOneData(subscriptionInfo, data);
+                break;
+            case 2:
+                decodeVersionTwoData(subscriptionInfo, data);
+                break;
+            default:
+                TaskAssignmentException fatalException = new 
TaskAssignmentException("Unable to decode subscription data: " +
+                    "used version: " + usedVersion + "; latest supported 
version: " + LATEST_SUPPORTED_VERSION);
+                log.error(fatalException.getMessage(), fatalException);
+                throw fatalException;
+        }
+
+        return subscriptionInfo;
+    }
 
-        } else {
-            TaskAssignmentException ex = new TaskAssignmentException("unable 
to decode subscription data: version=" + version);
-            log.error(ex.getMessage(), ex);
-            throw ex;
+    private static void decodeVersionOneData(final SubscriptionInfo 
subscriptionInfo,
+                                             final ByteBuffer data) {
+        // decode client UUID
+        subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+
+        // decode previously active tasks
+        final int numPrevs = data.getInt();
+        subscriptionInfo.prevTasks = new HashSet<>();
+        for (int i = 0; i < numPrevs; i++) {
+            TaskId id = TaskId.readFrom(data);
+            subscriptionInfo.prevTasks.add(id);
+        }
+
+        // decode previously cached tasks
+        final int numCached = data.getInt();
+        subscriptionInfo.standbyTasks = new HashSet<>();
+        for (int i = 0; i < numCached; i++) {
+            subscriptionInfo.standbyTasks.add(TaskId.readFrom(data));
+        }
+    }
+
+    private static void decodeVersionTwoData(final SubscriptionInfo 
subscriptionInfo,
+                                             final ByteBuffer data) {
+        decodeVersionOneData(subscriptionInfo, data);
+
+        // decode user end point (can be null)
+        int bytesLength = data.getInt();
+        if (bytesLength != 0) {
+            final byte[] bytes = new byte[bytesLength];
+            data.get(bytes);
+            subscriptionInfo.userEndPoint = new String(bytes, 
Charset.forName("UTF-8"));
         }
     }
 
     @Override
     public int hashCode() {
-        int hashCode = version ^ processId.hashCode() ^ prevTasks.hashCode() ^ 
standbyTasks.hashCode();
+        final int hashCode = usedVersion ^ processId.hashCode() ^ 
prevTasks.hashCode() ^ standbyTasks.hashCode();
         if (userEndPoint == null) {
             return hashCode;
         }
@@ -141,10 +239,10 @@ public int hashCode() {
     }
 
     @Override
-    public boolean equals(Object o) {
+    public boolean equals(final Object o) {
         if (o instanceof SubscriptionInfo) {
-            SubscriptionInfo other = (SubscriptionInfo) o;
-            return this.version == other.version &&
+            final SubscriptionInfo other = (SubscriptionInfo) o;
+            return this.usedVersion == other.usedVersion &&
                     this.processId.equals(other.processId) &&
                     this.prevTasks.equals(other.prevTasks) &&
                     this.standbyTasks.equals(other.standbyTasks) &&
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 8b4e8957ed5..bb06c72d080 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -376,7 +376,6 @@ public boolean conditionMet() {
         }
     }
 
-
     @Test
     public void queryOnRebalance() throws InterruptedException {
         final int numThreads = STREAM_TWO_PARTITIONS;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index bf3f1d1ac5e..b0c0d68287b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -239,17 +239,17 @@ public void testAssignBasic() throws Exception {
 
         // the first consumer
         AssignmentInfo info10 = checkAssignment(allTopics, 
assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        allActiveTasks.addAll(info10.activeTasks());
 
         // the second consumer
         AssignmentInfo info11 = checkAssignment(allTopics, 
assignments.get("consumer11"));
-        allActiveTasks.addAll(info11.activeTasks);
+        allActiveTasks.addAll(info11.activeTasks());
 
         assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
 
         // the third consumer
         AssignmentInfo info20 = checkAssignment(allTopics, 
assignments.get("consumer20"));
-        allActiveTasks.addAll(info20.activeTasks);
+        allActiveTasks.addAll(info20.activeTasks());
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -317,13 +317,13 @@ public void 
shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() throws E
         final AssignmentInfo info10 = 
AssignmentInfo.decode(assignments.get("consumer10").userData());
 
         final List<TaskId> expectedInfo10TaskIds = Arrays.asList(taskIdA1, 
taskIdA3, taskIdB1, taskIdB3);
-        assertEquals(expectedInfo10TaskIds, info10.activeTasks);
+        assertEquals(expectedInfo10TaskIds, info10.activeTasks());
 
         // the second consumer
         final AssignmentInfo info11 = 
AssignmentInfo.decode(assignments.get("consumer11").userData());
         final List<TaskId> expectedInfo11TaskIds = Arrays.asList(taskIdA0, 
taskIdA2, taskIdB0, taskIdB2);
 
-        assertEquals(expectedInfo11TaskIds, info11.activeTasks);
+        assertEquals(expectedInfo11TaskIds, info11.activeTasks());
     }
 
     @Test
@@ -354,7 +354,7 @@ public void testAssignWithPartialTopology() throws 
Exception {
         // check assignment info
         Set<TaskId> allActiveTasks = new HashSet<>();
         AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), 
assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        allActiveTasks.addAll(info10.activeTasks());
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -394,7 +394,7 @@ public void testAssignEmptyMetadata() throws Exception {
         // check assignment info
         Set<TaskId> allActiveTasks = new HashSet<>();
         AssignmentInfo info10 = 
checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        allActiveTasks.addAll(info10.activeTasks());
 
         assertEquals(0, allActiveTasks.size());
         assertEquals(Collections.<TaskId>emptySet(), new 
HashSet<>(allActiveTasks));
@@ -407,7 +407,7 @@ public void testAssignEmptyMetadata() throws Exception {
 
         // the first consumer
         info10 = checkAssignment(allTopics, assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        allActiveTasks.addAll(info10.activeTasks());
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -455,15 +455,15 @@ public void testAssignWithNewTasks() throws Exception {
         AssignmentInfo info;
 
         info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        allActiveTasks.addAll(info.activeTasks);
+        allActiveTasks.addAll(info.activeTasks());
         allPartitions.addAll(assignments.get("consumer10").partitions());
 
         info = AssignmentInfo.decode(assignments.get("consumer11").userData());
-        allActiveTasks.addAll(info.activeTasks);
+        allActiveTasks.addAll(info.activeTasks());
         allPartitions.addAll(assignments.get("consumer11").partitions());
 
         info = AssignmentInfo.decode(assignments.get("consumer20").userData());
-        allActiveTasks.addAll(info.activeTasks);
+        allActiveTasks.addAll(info.activeTasks());
         allPartitions.addAll(assignments.get("consumer20").partitions());
 
         assertEquals(allTasks, allActiveTasks);
@@ -524,14 +524,14 @@ public void testAssignWithStates() throws Exception {
         AssignmentInfo info11 = 
AssignmentInfo.decode(assignments.get("consumer11").userData());
         AssignmentInfo info20 = 
AssignmentInfo.decode(assignments.get("consumer20").userData());
 
-        assertEquals(2, info10.activeTasks.size());
-        assertEquals(2, info11.activeTasks.size());
-        assertEquals(2, info20.activeTasks.size());
+        assertEquals(2, info10.activeTasks().size());
+        assertEquals(2, info11.activeTasks().size());
+        assertEquals(2, info20.activeTasks().size());
 
         Set<TaskId> allTasks = new HashSet<>();
-        allTasks.addAll(info10.activeTasks);
-        allTasks.addAll(info11.activeTasks);
-        allTasks.addAll(info20.activeTasks);
+        allTasks.addAll(info10.activeTasks());
+        allTasks.addAll(info11.activeTasks());
+        allTasks.addAll(info20.activeTasks());
         assertEquals(new HashSet<>(tasks), allTasks);
 
         // check tasks for state topics
@@ -603,15 +603,15 @@ public void testAssignWithStandbyReplicas() throws 
Exception {
 
         // the first consumer
         AssignmentInfo info10 = checkAssignment(allTopics, 
assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
-        allStandbyTasks.addAll(info10.standbyTasks.keySet());
+        allActiveTasks.addAll(info10.activeTasks());
+        allStandbyTasks.addAll(info10.standbyTasks().keySet());
 
         // the second consumer
         AssignmentInfo info11 = checkAssignment(allTopics, 
assignments.get("consumer11"));
-        allActiveTasks.addAll(info11.activeTasks);
-        allStandbyTasks.addAll(info11.standbyTasks.keySet());
+        allActiveTasks.addAll(info11.activeTasks());
+        allStandbyTasks.addAll(info11.standbyTasks().keySet());
 
-        assertNotEquals("same processId has same set of standby tasks", 
info11.standbyTasks.keySet(), info10.standbyTasks.keySet());
+        assertNotEquals("same processId has same set of standby tasks", 
info11.standbyTasks().keySet(), info10.standbyTasks().keySet());
 
         // check active tasks assigned to the first client
         assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
@@ -619,8 +619,8 @@ public void testAssignWithStandbyReplicas() throws 
Exception {
 
         // the third consumer
         AssignmentInfo info20 = checkAssignment(allTopics, 
assignments.get("consumer20"));
-        allActiveTasks.addAll(info20.activeTasks);
-        allStandbyTasks.addAll(info20.standbyTasks.keySet());
+        allActiveTasks.addAll(info20.activeTasks());
+        allStandbyTasks.addAll(info20.standbyTasks().keySet());
 
         // all task ids are in the active tasks and also in the standby tasks
 
@@ -847,7 +847,7 @@ public void shouldAddUserDefinedEndPointToSubscription() 
throws Exception {
         
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
 (Object) userEndPoint));
         final PartitionAssignor.Subscription subscription = 
partitionAssignor.subscription(Utils.mkSet("input"));
         final SubscriptionInfo subscriptionInfo = 
SubscriptionInfo.decode(subscription.userData());
-        assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
+        assertEquals("localhost:8080", subscriptionInfo.userEndPoint());
     }
 
     @Test
@@ -874,7 +874,7 @@ public void shouldMapUserEndPointToTopicPartitions() throws 
Exception {
         final Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, subscriptions);
         final PartitionAssignor.Assignment consumerAssignment = 
assignments.get("consumer1");
         final AssignmentInfo assignmentInfo = 
AssignmentInfo.decode(consumerAssignment.userData());
-        final Set<TopicPartition> topicPartitions = 
assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
+        final Set<TopicPartition> topicPartitions = 
assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
         assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
                 new TopicPartition("topic1", 1),
                 new TopicPartition("topic1", 2)), topicPartitions);
@@ -1072,8 +1072,8 @@ public void 
shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Except
         final Map<String, PartitionAssignor.Assignment> assign = 
partitionAssignor.assign(metadata, subscriptions);
         final PartitionAssignor.Assignment consumer1Assignment = 
assign.get("consumer1");
         final AssignmentInfo assignmentInfo = 
AssignmentInfo.decode(consumer1Assignment.userData());
-        final Set<TopicPartition> consumer1partitions = 
assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
-        final Set<TopicPartition> consumer2Partitions = 
assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090));
+        final Set<TopicPartition> consumer1partitions = 
assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
+        final Set<TopicPartition> consumer2Partitions = 
assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090));
         final HashSet<TopicPartition> allAssignedPartitions = new 
HashSet<>(consumer1partitions);
         allAssignedPartitions.addAll(consumer2Partitions);
         assertThat(consumer1partitions, not(allPartitions));
@@ -1095,6 +1095,37 @@ public void 
shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProvider
         partitionAssignor.configure(config);
     }
 
+    @Test
+    public void 
shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() throws 
Exception {
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new 
HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+        subscriptions.put(
+            "consumer1",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, 
emptyTasks, null).encode()
+            )
+        );
+        subscriptions.put(
+            "consumer2",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, 
emptyTasks, null).encode()
+            )
+        );
+
+        mockTaskManager(Collections.<TaskId>emptySet(),
+            Collections.<TaskId>emptySet(),
+            UUID.randomUUID(),
+            new InternalTopologyBuilder());
+        partitionAssignor.configure(configProps());
+        final Map<String, PartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, subscriptions);
+
+        assertThat(assignment.size(), equalTo(2));
+        
assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(),
 equalTo(1));
+        
assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(),
 equalTo(1));
+    }
+
     private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, 
Set<TopicPartition>> firstHostState) {
         final AssignmentInfo info = new 
AssignmentInfo(Collections.<TaskId>emptyList(),
                                                        Collections.<TaskId, 
Set<TopicPartition>>emptyMap(),
@@ -1111,7 +1142,7 @@ private AssignmentInfo checkAssignment(Set<String> 
expectedTopics, PartitionAssi
         AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
 
         // check if the number of assigned partitions == the size of active 
task id list
-        assertEquals(assignment.partitions().size(), info.activeTasks.size());
+        assertEquals(assignment.partitions().size(), 
info.activeTasks().size());
 
         // check if active tasks are consistent
         List<TaskId> activeTasks = new ArrayList<>();
@@ -1121,14 +1152,14 @@ private AssignmentInfo checkAssignment(Set<String> 
expectedTopics, PartitionAssi
             activeTasks.add(new TaskId(0, partition.partition()));
             activeTopics.add(partition.topic());
         }
-        assertEquals(activeTasks, info.activeTasks);
+        assertEquals(activeTasks, info.activeTasks());
 
         // check if active partitions cover all topics
         assertEquals(expectedTopics, activeTopics);
 
         // check if standby tasks are consistent
         Set<String> standbyTopics = new HashSet<>();
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : 
info.standbyTasks.entrySet()) {
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : 
info.standbyTasks().entrySet()) {
             TaskId id = entry.getKey();
             Set<TopicPartition> partitions = entry.getValue();
             for (TopicPartition partition : partitions) {
@@ -1139,7 +1170,7 @@ private AssignmentInfo checkAssignment(Set<String> 
expectedTopics, PartitionAssi
             }
         }
 
-        if (info.standbyTasks.size() > 0) {
+        if (info.standbyTasks().size() > 0) {
             // check if standby partitions cover all topics
             assertEquals(expectedTopics, standbyTopics);
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index ec94ad81acd..726a5623cd5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -33,6 +33,7 @@
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class AssignmentInfoTest {
 
@@ -61,10 +62,10 @@ public void shouldDecodePreviousVersion() throws 
IOException {
         standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new 
TopicPartition("t3", 0), new TopicPartition("t3", 0)));
         final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, 
standbyTasks, null);
         final AssignmentInfo decoded = 
AssignmentInfo.decode(encodeV1(oldVersion));
-        assertEquals(oldVersion.activeTasks, decoded.activeTasks);
-        assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
-        assertEquals(0, decoded.partitionsByHost.size()); // should be empty 
as wasn't in V1
-        assertEquals(2, decoded.version); // automatically upgraded to v2 on 
decode;
+        assertEquals(oldVersion.activeTasks(), decoded.activeTasks());
+        assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks());
+        assertNull(decoded.partitionsByHost()); // should be null as wasn't in 
V1
+        assertEquals(1, decoded.version());
     }
 
 
@@ -76,15 +77,15 @@ private ByteBuffer encodeV1(AssignmentInfo oldVersion) 
throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream out = new DataOutputStream(baos);
         // Encode version
-        out.writeInt(oldVersion.version);
+        out.writeInt(oldVersion.version());
         // Encode active tasks
-        out.writeInt(oldVersion.activeTasks.size());
-        for (TaskId id : oldVersion.activeTasks) {
+        out.writeInt(oldVersion.activeTasks().size());
+        for (TaskId id : oldVersion.activeTasks()) {
             id.writeTo(out);
         }
         // Encode standby tasks
-        out.writeInt(oldVersion.standbyTasks.size());
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : 
oldVersion.standbyTasks.entrySet()) {
+        out.writeInt(oldVersion.standbyTasks().size());
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : 
oldVersion.standbyTasks().entrySet()) {
             TaskId id = entry.getKey();
             id.writeTo(out);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 9c011bb0cae..633285a2b4d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -65,14 +65,12 @@ public void shouldBeBackwardCompatible() {
 
         final ByteBuffer v1Encoding = encodePreviousVersion(processId, 
activeTasks, standbyTasks);
         final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
-        assertEquals(activeTasks, decode.prevTasks);
-        assertEquals(standbyTasks, decode.standbyTasks);
-        assertEquals(processId, decode.processId);
-        assertNull(decode.userEndPoint);
-
+        assertEquals(activeTasks, decode.prevTasks());
+        assertEquals(standbyTasks, decode.standbyTasks());
+        assertEquals(processId, decode.processId());
+        assertNull(decode.userEndPoint());
     }
 
-
     /**
      * This is a clone of what the V1 encoding did. The encode method has 
changed for V2
      * so it is impossible to test compatibility without having this


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6054
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6054
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: James Cheng
>            Assignee: Matthias J. Sax
>            Priority: Major
>              Labels: needs-kip
>
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running. 
> We observed the following stack trace:
> {code}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
>         at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
>         at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>         
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, 
> because the internal version number of the protocol changed when adding 
> Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to