[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442058#comment-16442058
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---------------------------------------

mjsax closed pull request #4880:  KAFKA-6054: Update Kafka Streams metadata to 
version 3
URL: https://github.com/apache/kafka/pull/4880
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index f8daf2fdddc..5b0e6496c2e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1087,6 +1087,18 @@ project(':streams:upgrade-system-tests-10') {
   }
 }
 
+project(':streams:upgrade-system-tests-11') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-11"
+
+  dependencies {
+    testCompile libs.kafkaStreams_11
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
 project(':jmh-benchmarks') {
 
   apply plugin: 'com.github.johnrengelman.shadow'
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index effe763ac45..a6ef5dddeec 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -67,6 +67,7 @@ versions += [
   kafka_0102: "0.10.2.1",
   kafka_0110: "0.11.0.2",
   kafka_10: "1.0.1",
+  kafka_11: "1.1.0",
   lz4: "1.4.1",
   metrics: "2.2.0",
   // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
@@ -115,6 +116,7 @@ libs += [
   kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
   kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
   kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
+  kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
   log4j: "log4j:log4j:$versions.log4j",
   lz4: "org.lz4:lz4-java:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
diff --git a/settings.gradle b/settings.gradle
index 03136849fd5..2a7977cfc93 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,5 +15,6 @@
 
 include 'core', 'examples', 'clients', 'tools', 'streams', 
'streams:test-utils', 'streams:examples',
         'streams:upgrade-system-tests-0100', 
'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
-        'streams:upgrade-system-tests-0110', 
'streams:upgrade-system-tests-10', 'log4j-appender',
-        'connect:api', 'connect:transforms', 'connect:runtime', 
'connect:json', 'connect:file', 'jmh-benchmarks'
+        'streams:upgrade-system-tests-0110', 
'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
+        'log4j-appender', 'connect:api', 'connect:transforms', 
'connect:runtime', 'connect:json', 'connect:file',
+        'jmh-benchmarks'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 819bebd43b6..65b1da6dede 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -172,6 +172,31 @@
      */
     public static final String UPGRADE_FROM_0100 = "0.10.0";
 
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 0.10.1.x}.
+     */
+    public static final String UPGRADE_FROM_0101 = "0.10.1";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 0.10.2.x}.
+     */
+    public static final String UPGRADE_FROM_0102 = "0.10.2";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 0.11.0.x}.
+     */
+    public static final String UPGRADE_FROM_0110 = "0.11.0";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 1.0.x}.
+     */
+    public static final String UPGRADE_FROM_10 = "1.0";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 1.1.x}.
+     */
+    public static final String UPGRADE_FROM_11 = "1.1";
+
     /**
      * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG 
"processing.guarantee"} for at-least-once processing guarantees.
      */
@@ -347,8 +372,9 @@
 
     /** {@code upgrade.from} */
     public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
-    public static final String UPGRADE_FROM_DOC = "Allows upgrading from 
version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
-        "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" 
(for upgrading from 0.10.0.x).";
+    public static final String UPGRADE_FROM_DOC = "Allows upgrading from 
versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a 
backward compatible way. " +
+        "When upgrading from 1.2 to a newer version it is not required to 
specify this config." +
+        "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", 
\"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + 
UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + 
"\" (for upgrading from the corresponding old version).";
 
     /**
      * {@code value.serde}
@@ -364,7 +390,7 @@
 
     /**
      * {@code zookeeper.connect}
-     * @deprecated Kakfa Streams does not use Zookeeper anymore and this 
parameter will be ignored.
+     * @deprecated Kafka Streams does not use Zookeeper anymore and this 
parameter will be ignored.
      */
     @Deprecated
     public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
@@ -575,7 +601,7 @@
             .define(UPGRADE_FROM_CONFIG,
                     ConfigDef.Type.STRING,
                     null,
-                    in(null, UPGRADE_FROM_0100),
+                    in(null, UPGRADE_FROM_0100, UPGRADE_FROM_0101, 
UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11),
                     Importance.LOW,
                     UPGRADE_FROM_DOC)
             .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 97771e56879..c81105ef821 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -199,10 +199,24 @@ public void configure(final Map<String, ?> configs) {
         final LogContext logContext = new LogContext(logPrefix);
         log = logContext.logger(getClass());
 
-        final String upgradeMode = (String) 
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
-        if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
-            log.info("Downgrading metadata version from 2 to 1 for upgrade 
from 0.10.0.x.");
-            userMetadataVersion = 1;
+        final String upgradeFrom = 
streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
+        if (upgradeFrom != null) {
+            switch (upgradeFrom) {
+                case StreamsConfig.UPGRADE_FROM_0100:
+                    log.info("Downgrading metadata version from {} to 1 for 
upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+                    userMetadataVersion = 1;
+                    break;
+                case StreamsConfig.UPGRADE_FROM_0101:
+                case StreamsConfig.UPGRADE_FROM_0102:
+                case StreamsConfig.UPGRADE_FROM_0110:
+                case StreamsConfig.UPGRADE_FROM_10:
+                case StreamsConfig.UPGRADE_FROM_11:
+                    log.info("Downgrading metadata version from {} to 2 for 
upgrade from " + upgradeFrom + ".x.", 
SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+                    userMetadataVersion = 2;
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown configuration 
value for parameter 'upgrade.from': " + upgradeFrom);
+            }
         }
 
         final Object o = 
configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
@@ -512,7 +526,7 @@ public Subscription subscription(final Set<String> topics) {
 
         // construct the global partition assignment per host map
         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new 
HashMap<>();
-        if (minUserMetadataVersion == 2) {
+        if (minUserMetadataVersion == 2 || minUserMetadataVersion == 3) {
             for (final Map.Entry<UUID, ClientMetadata> entry : 
clientsMetadata.entrySet()) {
                 final HostInfo hostInfo = entry.getValue().hostInfo;
 
@@ -631,6 +645,10 @@ public void onAssignment(final Assignment assignment) {
                 processVersionTwoAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
                 partitionsByHost = info.partitionsByHost();
                 break;
+            case 3:
+                processVersionThreeAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
+                partitionsByHost = info.partitionsByHost();
+                break;
             default:
                 throw new IllegalStateException("Unknown metadata version: " + 
usedVersion
                     + "; latest supported version: " + 
AssignmentInfo.LATEST_SUPPORTED_VERSION);
@@ -684,6 +702,13 @@ private void processVersionTwoAssignment(final 
AssignmentInfo info,
         }
     }
 
+    private void processVersionThreeAssignment(final AssignmentInfo info,
+                                               final List<TopicPartition> 
partitions,
+                                               final Map<TaskId, 
Set<TopicPartition>> activeTasks,
+                                               final Map<TopicPartition, 
PartitionInfo> topicToPartitionInfo) {
+        processVersionTwoAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
+    }
+
     /**
      * Internal helper function that creates a Kafka topic
      *
@@ -818,4 +843,5 @@ void validate(final Set<String> copartitionGroup,
     void setInternalTopicManager(final InternalTopicManager 
internalTopicManager) {
         this.internalTopicManager = internalTopicManager;
     }
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index c8df7498755..3c5cee2bfc3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.HostInfo;
@@ -30,6 +30,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -40,15 +41,20 @@
 
     private static final Logger log = 
LoggerFactory.getLogger(AssignmentInfo.class);
 
-    public static final int LATEST_SUPPORTED_VERSION = 2;
+    public static final int LATEST_SUPPORTED_VERSION = 3;
+    public static final int UNKNOWN = -1;
 
     private final int usedVersion;
+    private final int latestSupportedVersion;
     private List<TaskId> activeTasks;
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
     private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
 
-    private AssignmentInfo(final int version) {
+    // used for decoding; don't apply version checks
+    private AssignmentInfo(final int version,
+                           final int latestSupportedVersion) {
         this.usedVersion = version;
+        this.latestSupportedVersion = latestSupportedVersion;
     }
 
     public AssignmentInfo(final List<TaskId> activeTasks,
@@ -57,11 +63,33 @@ public AssignmentInfo(final List<TaskId> activeTasks,
         this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
     }
 
+    public AssignmentInfo() {
+        this(LATEST_SUPPORTED_VERSION,
+            Collections.<TaskId>emptyList(),
+            Collections.<TaskId, Set<TopicPartition>>emptyMap(),
+            Collections.<HostInfo, Set<TopicPartition>>emptyMap());
+    }
+
     public AssignmentInfo(final int version,
                           final List<TaskId> activeTasks,
                           final Map<TaskId, Set<TopicPartition>> standbyTasks,
                           final Map<HostInfo, Set<TopicPartition>> hostState) {
+        this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, 
hostState);
+
+        if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
+            throw new IllegalArgumentException("version must be between 1 and 
" + LATEST_SUPPORTED_VERSION
+                + "; was: " + version);
+        }
+    }
+
+    // for testing only; don't apply version checks
+    AssignmentInfo(final int version,
+                   final int latestSupportedVersion,
+                   final List<TaskId> activeTasks,
+                   final Map<TaskId, Set<TopicPartition>> standbyTasks,
+                   final Map<HostInfo, Set<TopicPartition>> hostState) {
         this.usedVersion = version;
+        this.latestSupportedVersion = latestSupportedVersion;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
         this.partitionsByHost = hostState;
@@ -71,6 +99,10 @@ public int version() {
         return usedVersion;
     }
 
+    public int latestSupportedVersion() {
+        return latestSupportedVersion;
+    }
+
     public List<TaskId> activeTasks() {
         return activeTasks;
     }
@@ -98,6 +130,9 @@ public ByteBuffer encode() {
                 case 2:
                     encodeVersionTwo(out);
                     break;
+                case 3:
+                    encodeVersionThree(out);
+                    break;
                 default:
                     throw new IllegalStateException("Unknown metadata version: 
" + usedVersion
                         + "; latest supported version: " + 
LATEST_SUPPORTED_VERSION);
@@ -161,6 +196,13 @@ private void writeTopicPartitions(final DataOutputStream 
out,
         }
     }
 
+    private void encodeVersionThree(final DataOutputStream out) throws 
IOException {
+        out.writeInt(3);
+        out.writeInt(LATEST_SUPPORTED_VERSION);
+        encodeActiveAndStandbyTaskAssignment(out);
+        encodePartitionsByHost(out);
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to decode the data or 
if the data version is unknown
      */
@@ -169,19 +211,25 @@ public static AssignmentInfo decode(final ByteBuffer 
data) {
         data.rewind();
 
         try (final DataInputStream in = new DataInputStream(new 
ByteBufferInputStream(data))) {
-            // decode used version
-            final int usedVersion = in.readInt();
-            final AssignmentInfo assignmentInfo = new 
AssignmentInfo(usedVersion);
+            final AssignmentInfo assignmentInfo;
 
+            final int usedVersion = in.readInt();
             switch (usedVersion) {
                 case 1:
+                    assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
                     decodeVersionOneData(assignmentInfo, in);
                     break;
                 case 2:
+                    assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
                     decodeVersionTwoData(assignmentInfo, in);
                     break;
+                case 3:
+                    final int latestSupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, 
latestSupportedVersion);
+                    decodeVersionThreeData(assignmentInfo, in);
+                    break;
                 default:
-                    TaskAssignmentException fatalException = new 
TaskAssignmentException("Unable to decode subscription data: " +
+                    TaskAssignmentException fatalException = new 
TaskAssignmentException("Unable to decode assignment data: " +
                         "used version: " + usedVersion + "; latest supported 
version: " + LATEST_SUPPORTED_VERSION);
                     log.error(fatalException.getMessage(), fatalException);
                     throw fatalException;
@@ -195,15 +243,23 @@ public static AssignmentInfo decode(final ByteBuffer 
data) {
 
     private static void decodeVersionOneData(final AssignmentInfo 
assignmentInfo,
                                              final DataInputStream in) throws 
IOException {
-        // decode active tasks
-        int count = in.readInt();
+        decodeActiveTasks(assignmentInfo, in);
+        decodeStandbyTasks(assignmentInfo, in);
+        assignmentInfo.partitionsByHost = new HashMap<>();
+    }
+
+    private static void decodeActiveTasks(final AssignmentInfo assignmentInfo,
+                                          final DataInputStream in) throws 
IOException {
+        final int count = in.readInt();
         assignmentInfo.activeTasks = new ArrayList<>(count);
         for (int i = 0; i < count; i++) {
             assignmentInfo.activeTasks.add(TaskId.readFrom(in));
         }
+    }
 
-        // decode standby tasks
-        count = in.readInt();
+    private static void decodeStandbyTasks(final AssignmentInfo assignmentInfo,
+                                           final DataInputStream in) throws 
IOException {
+        final int count = in.readInt();
         assignmentInfo.standbyTasks = new HashMap<>(count);
         for (int i = 0; i < count; i++) {
             TaskId id = TaskId.readFrom(in);
@@ -213,9 +269,13 @@ private static void decodeVersionOneData(final 
AssignmentInfo assignmentInfo,
 
     private static void decodeVersionTwoData(final AssignmentInfo 
assignmentInfo,
                                              final DataInputStream in) throws 
IOException {
-        decodeVersionOneData(assignmentInfo, in);
+        decodeActiveTasks(assignmentInfo, in);
+        decodeStandbyTasks(assignmentInfo, in);
+        decodeGlobalAssignmentData(assignmentInfo, in);
+    }
 
-        // decode partitions by host
+    private static void decodeGlobalAssignmentData(final AssignmentInfo 
assignmentInfo,
+                                                   final DataInputStream in) 
throws IOException {
         assignmentInfo.partitionsByHost = new HashMap<>();
         final int numEntries = in.readInt();
         for (int i = 0; i < numEntries; i++) {
@@ -233,19 +293,27 @@ private static void decodeVersionTwoData(final 
AssignmentInfo assignmentInfo,
         return partitions;
     }
 
+    private static void decodeVersionThreeData(final AssignmentInfo 
assignmentInfo,
+                                               final DataInputStream in) 
throws IOException {
+        decodeActiveTasks(assignmentInfo, in);
+        decodeStandbyTasks(assignmentInfo, in);
+        decodeGlobalAssignmentData(assignmentInfo, in);
+    }
+
     @Override
     public int hashCode() {
-        return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() 
^ partitionsByHost.hashCode();
+        return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ 
standbyTasks.hashCode() ^ partitionsByHost.hashCode();
     }
 
     @Override
     public boolean equals(final Object o) {
         if (o instanceof AssignmentInfo) {
             final AssignmentInfo other = (AssignmentInfo) o;
-            return this.usedVersion == other.usedVersion &&
-                    this.activeTasks.equals(other.activeTasks) &&
-                    this.standbyTasks.equals(other.standbyTasks) &&
-                    this.partitionsByHost.equals(other.partitionsByHost);
+            return usedVersion == other.usedVersion &&
+                    latestSupportedVersion == other.latestSupportedVersion &&
+                    activeTasks.equals(other.activeTasks) &&
+                    standbyTasks.equals(other.standbyTasks) &&
+                    partitionsByHost.equals(other.partitionsByHost);
         } else {
             return false;
         }
@@ -253,7 +321,11 @@ public boolean equals(final Object o) {
 
     @Override
     public String toString() {
-        return "[version=" + usedVersion + ", active tasks=" + 
activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
+        return "[version=" + usedVersion
+            + ", supported version=" + latestSupportedVersion
+            + ", active tasks=" + activeTasks
+            + ", standby tasks=" + standbyTasks
+            + ", global assignment=" + partitionsByHost + "]";
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 7fee90b5402..be709472441 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -23,6 +23,7 @@
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -31,16 +32,21 @@
 
     private static final Logger log = 
LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    public static final int LATEST_SUPPORTED_VERSION = 2;
+    public static final int LATEST_SUPPORTED_VERSION = 3;
+    public static final int UNKNOWN = -1;
 
     private final int usedVersion;
+    private final int latestSupportedVersion;
     private UUID processId;
     private Set<TaskId> prevTasks;
     private Set<TaskId> standbyTasks;
     private String userEndPoint;
 
-    private SubscriptionInfo(final int version) {
+    // used for decoding; don't apply version checks
+    private SubscriptionInfo(final int version,
+                             final int latestSupportedVersion) {
         this.usedVersion = version;
+        this.latestSupportedVersion = latestSupportedVersion;
     }
 
     public SubscriptionInfo(final UUID processId,
@@ -55,7 +61,23 @@ public SubscriptionInfo(final int version,
                             final Set<TaskId> prevTasks,
                             final Set<TaskId> standbyTasks,
                             final String userEndPoint) {
+        this(version, LATEST_SUPPORTED_VERSION, processId, prevTasks, 
standbyTasks, userEndPoint);
+
+        if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
+            throw new IllegalArgumentException("version must be between 1 and 
" + LATEST_SUPPORTED_VERSION
+                + "; was: " + version);
+        }
+    }
+
+    // for testing only; don't apply version checks
+    protected SubscriptionInfo(final int version,
+                               final int latestSupportedVersion,
+                               final UUID processId,
+                               final Set<TaskId> prevTasks,
+                               final Set<TaskId> standbyTasks,
+                               final String userEndPoint) {
         this.usedVersion = version;
+        this.latestSupportedVersion = latestSupportedVersion;
         this.processId = processId;
         this.prevTasks = prevTasks;
         this.standbyTasks = standbyTasks;
@@ -66,6 +88,10 @@ public int version() {
         return usedVersion;
     }
 
+    public int latestSupportedVersion() {
+        return latestSupportedVersion;
+    }
+
     public UUID processId() {
         return processId;
     }
@@ -93,7 +119,10 @@ public ByteBuffer encode() {
                 buf = encodeVersionOne();
                 break;
             case 2:
-                buf = encodeVersionTwo(prepareUserEndPoint());
+                buf = encodeVersionTwo();
+                break;
+            case 3:
+                buf = encodeVersionThree();
                 break;
             default:
                 throw new IllegalStateException("Unknown metadata version: " + 
usedVersion
@@ -108,7 +137,9 @@ private ByteBuffer encodeVersionOne() {
         final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength());
 
         buf.putInt(1); // version
-        encodeVersionOneData(buf);
+        encodeClientUUID(buf);
+        encodeTasks(buf, prevTasks);
+        encodeTasks(buf, standbyTasks);
 
         return buf;
     }
@@ -120,18 +151,15 @@ private int getVersionOneByteLength() {
                4 + standbyTasks.size() * 8; // length + standby tasks
     }
 
-    private void encodeVersionOneData(final ByteBuffer buf) {
-        // encode client UUID
+    private void encodeClientUUID(final ByteBuffer buf) {
         buf.putLong(processId.getMostSignificantBits());
         buf.putLong(processId.getLeastSignificantBits());
-        // encode ids of previously running tasks
-        buf.putInt(prevTasks.size());
-        for (TaskId id : prevTasks) {
-            id.writeTo(buf);
-        }
-        // encode ids of cached tasks
-        buf.putInt(standbyTasks.size());
-        for (TaskId id : standbyTasks) {
+    }
+
+    private void encodeTasks(final ByteBuffer buf,
+                             final Collection<TaskId> taskIds) {
+        buf.putInt(taskIds.size());
+        for (TaskId id : taskIds) {
             id.writeTo(buf);
         }
     }
@@ -144,52 +172,87 @@ private void encodeVersionOneData(final ByteBuffer buf) {
         }
     }
 
-    private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) {
+    private ByteBuffer encodeVersionTwo() {
+        final byte[] endPointBytes = prepareUserEndPoint();
+
         final ByteBuffer buf = 
ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes));
 
         buf.putInt(2); // version
-        encodeVersionTwoData(buf, endPointBytes);
+        encodeClientUUID(buf);
+        encodeTasks(buf, prevTasks);
+        encodeTasks(buf, standbyTasks);
+        encodeUserEndPoint(buf, endPointBytes);
 
         return buf;
     }
 
     private int getVersionTwoByteLength(final byte[] endPointBytes) {
-        return getVersionOneByteLength() +
+        return 4 + // version
+               16 + // client ID
+               4 + prevTasks.size() * 8 + // length + prev tasks
+               4 + standbyTasks.size() * 8 + // length + standby tasks
                4 + endPointBytes.length; // length + userEndPoint
     }
 
-    private void encodeVersionTwoData(final ByteBuffer buf,
-                                      final byte[] endPointBytes) {
-        encodeVersionOneData(buf);
+    private void encodeUserEndPoint(final ByteBuffer buf,
+                                    final byte[] endPointBytes) {
         if (endPointBytes != null) {
             buf.putInt(endPointBytes.length);
             buf.put(endPointBytes);
         }
     }
 
+    private ByteBuffer encodeVersionThree() {
+        final byte[] endPointBytes = prepareUserEndPoint();
+
+        final ByteBuffer buf = 
ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes));
+
+        buf.putInt(3); // used version
+        buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
+        encodeClientUUID(buf);
+        encodeTasks(buf, prevTasks);
+        encodeTasks(buf, standbyTasks);
+        encodeUserEndPoint(buf, endPointBytes);
+
+        return buf;
+    }
+
+    private int getVersionThreeByteLength(final byte[] endPointBytes) {
+        return 4 + // used version
+               4 + // latest supported version version
+               16 + // client ID
+               4 + prevTasks.size() * 8 + // length + prev tasks
+               4 + standbyTasks.size() * 8 + // length + standby tasks
+               4 + endPointBytes.length; // length + userEndPoint
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to decode the data
      */
     public static SubscriptionInfo decode(final ByteBuffer data) {
+        final SubscriptionInfo subscriptionInfo;
+
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
 
-        // decode used version
         final int usedVersion = data.getInt();
-        final SubscriptionInfo subscriptionInfo = new 
SubscriptionInfo(usedVersion);
-
         switch (usedVersion) {
             case 1:
+                subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
                 decodeVersionOneData(subscriptionInfo, data);
                 break;
             case 2:
+                subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
                 decodeVersionTwoData(subscriptionInfo, data);
                 break;
+            case 3:
+                final int latestSupportedVersion = data.getInt();
+                subscriptionInfo = new SubscriptionInfo(usedVersion, 
latestSupportedVersion);
+                decodeVersionThreeData(subscriptionInfo, data);
+                break;
             default:
-                TaskAssignmentException fatalException = new 
TaskAssignmentException("Unable to decode subscription data: " +
-                    "used version: " + usedVersion + "; latest supported 
version: " + LATEST_SUPPORTED_VERSION);
-                log.error(fatalException.getMessage(), fatalException);
-                throw fatalException;
+                subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
+                log.info("Unable to decode subscription data: used version: 
{}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION);
         }
 
         return subscriptionInfo;
@@ -197,30 +260,43 @@ public static SubscriptionInfo decode(final ByteBuffer 
data) {
 
     private static void decodeVersionOneData(final SubscriptionInfo 
subscriptionInfo,
                                              final ByteBuffer data) {
-        // decode client UUID
-        subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+        decodeClientUUID(subscriptionInfo, data);
 
-        // decode previously active tasks
-        final int numPrevs = data.getInt();
         subscriptionInfo.prevTasks = new HashSet<>();
-        for (int i = 0; i < numPrevs; i++) {
-            TaskId id = TaskId.readFrom(data);
-            subscriptionInfo.prevTasks.add(id);
-        }
+        decodeTasks(subscriptionInfo.prevTasks, data);
 
-        // decode previously cached tasks
-        final int numCached = data.getInt();
         subscriptionInfo.standbyTasks = new HashSet<>();
-        for (int i = 0; i < numCached; i++) {
-            subscriptionInfo.standbyTasks.add(TaskId.readFrom(data));
+        decodeTasks(subscriptionInfo.standbyTasks, data);
+    }
+
+    private static void decodeClientUUID(final SubscriptionInfo 
subscriptionInfo,
+                                         final ByteBuffer data) {
+        subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+    }
+
+    private static void decodeTasks(final Collection<TaskId> taskIds,
+                                    final ByteBuffer data) {
+        final int numPrevs = data.getInt();
+        for (int i = 0; i < numPrevs; i++) {
+            taskIds.add(TaskId.readFrom(data));
         }
     }
 
     private static void decodeVersionTwoData(final SubscriptionInfo 
subscriptionInfo,
                                              final ByteBuffer data) {
-        decodeVersionOneData(subscriptionInfo, data);
+        decodeClientUUID(subscriptionInfo, data);
+
+        subscriptionInfo.prevTasks = new HashSet<>();
+        decodeTasks(subscriptionInfo.prevTasks, data);
 
-        // decode user end point (can be null)
+        subscriptionInfo.standbyTasks = new HashSet<>();
+        decodeTasks(subscriptionInfo.standbyTasks, data);
+
+        decodeUserEndPoint(subscriptionInfo, data);
+    }
+
+    private static void decodeUserEndPoint(final SubscriptionInfo 
subscriptionInfo,
+                                           final ByteBuffer data) {
         int bytesLength = data.getInt();
         if (bytesLength != 0) {
             final byte[] bytes = new byte[bytesLength];
@@ -229,9 +305,21 @@ private static void decodeVersionTwoData(final 
SubscriptionInfo subscriptionInfo
         }
     }
 
-    @Override
+    private static void decodeVersionThreeData(final SubscriptionInfo 
subscriptionInfo,
+                                               final ByteBuffer data) {
+        decodeClientUUID(subscriptionInfo, data);
+
+        subscriptionInfo.prevTasks = new HashSet<>();
+        decodeTasks(subscriptionInfo.prevTasks, data);
+
+        subscriptionInfo.standbyTasks = new HashSet<>();
+        decodeTasks(subscriptionInfo.standbyTasks, data);
+
+        decodeUserEndPoint(subscriptionInfo, data);
+    }
+
     public int hashCode() {
-        final int hashCode = usedVersion ^ processId.hashCode() ^ 
prevTasks.hashCode() ^ standbyTasks.hashCode();
+        final int hashCode = usedVersion ^ latestSupportedVersion ^ 
processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
         if (userEndPoint == null) {
             return hashCode;
         }
@@ -243,6 +331,7 @@ public boolean equals(final Object o) {
         if (o instanceof SubscriptionInfo) {
             final SubscriptionInfo other = (SubscriptionInfo) o;
             return this.usedVersion == other.usedVersion &&
+                    this.latestSupportedVersion == 
other.latestSupportedVersion &&
                     this.processId.equals(other.processId) &&
                     this.prevTasks.equals(other.prevTasks) &&
                     this.standbyTasks.equals(other.standbyTasks) &&
@@ -252,4 +341,13 @@ public boolean equals(final Object o) {
         }
     }
 
+    @Override
+    public String toString() {
+        return "[version=" + usedVersion
+            + ", supported version=" + latestSupportedVersion
+            + ", process ID=" + processId
+            + ", prev tasks=" + prevTasks
+            + ", standby tasks=" + standbyTasks
+            + ", user endpoint=" + userEndPoint + "]";
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index e9ed9682066..4e04b4985ed 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -46,7 +46,6 @@
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -64,6 +63,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 public class StreamsPartitionAssignorTest {
 
@@ -867,9 +867,12 @@ public void shouldMapUserEndPointToTopicPartitions() {
         final PartitionAssignor.Assignment consumerAssignment = 
assignments.get("consumer1");
         final AssignmentInfo assignmentInfo = 
AssignmentInfo.decode(consumerAssignment.userData());
         final Set<TopicPartition> topicPartitions = 
assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
-        assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
+        assertEquals(
+            Utils.mkSet(
+                new TopicPartition("topic1", 0),
                 new TopicPartition("topic1", 1),
-                new TopicPartition("topic1", 2)), topicPartitions);
+                new TopicPartition("topic1", 2)),
+            topicPartitions);
     }
 
     @Test
@@ -881,7 +884,7 @@ public void 
shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
 
         try {
             
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
 (Object) "localhost"));
-            Assert.fail("expected to an exception due to invalid config");
+            fail("expected to an exception due to invalid config");
         } catch (ConfigException e) {
             // pass
         }
@@ -893,7 +896,7 @@ public void 
shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
 
         try {
             
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
 (Object) "localhost:j87yhk"));
-            Assert.fail("expected to an exception due to invalid config");
+            fail("expected to an exception due to invalid config");
         } catch (ConfigException e) {
             // pass
         }
@@ -1088,21 +1091,36 @@ public void 
shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProvider
     }
 
     @Test
-    public void 
shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
+    public void 
shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2() {
+        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 
2);
+    }
+
+    @Test
+    public void 
shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3() {
+        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 
3);
+    }
+
+    @Test
+    public void 
shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3() {
+        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 
3);
+    }
+
+    private void 
shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int 
smallestVersion,
+                                                                               
      final int otherVersion) {
         final Map<String, PartitionAssignor.Subscription> subscriptions = new 
HashMap<>();
         final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put(
             "consumer1",
             new PartitionAssignor.Subscription(
                 Collections.singletonList("topic1"),
-                new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, 
emptyTasks, null).encode()
+                new SubscriptionInfo(smallestVersion, UUID.randomUUID(), 
emptyTasks, emptyTasks, null).encode()
             )
         );
         subscriptions.put(
             "consumer2",
             new PartitionAssignor.Subscription(
                 Collections.singletonList("topic1"),
-                new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, 
emptyTasks, null).encode()
+                new SubscriptionInfo(otherVersion, UUID.randomUUID(), 
emptyTasks, emptyTasks, null).encode()
             )
         );
 
@@ -1115,12 +1133,12 @@ public void 
shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(
         final Map<String, PartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, subscriptions);
 
         assertThat(assignment.size(), equalTo(2));
-        
assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(),
 equalTo(1));
-        
assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(),
 equalTo(1));
+        
assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(),
 equalTo(smallestVersion));
+        
assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(),
 equalTo(smallestVersion));
     }
 
     @Test
-    public void shouldDownGradeSubscription() {
+    public void shouldDownGradeSubscriptionToVersion1() {
         final Set<TaskId> emptyTasks = Collections.emptySet();
 
         mockTaskManager(
@@ -1135,6 +1153,46 @@ public void shouldDownGradeSubscription() {
         assertThat(SubscriptionInfo.decode(subscription.userData()).version(), 
equalTo(1));
     }
 
+    @Test
+    public void shouldDownGradeSubscriptionToVersion2For0101() {
+        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101);
+    }
+
+    @Test
+    public void shouldDownGradeSubscriptionToVersion2For0102() {
+        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102);
+    }
+
+    @Test
+    public void shouldDownGradeSubscriptionToVersion2For0110() {
+        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110);
+    }
+
+    @Test
+    public void shouldDownGradeSubscriptionToVersion2For10() {
+        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10);
+    }
+
+    @Test
+    public void shouldDownGradeSubscriptionToVersion2For11() {
+        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11);
+    }
+
+    private void shouldDownGradeSubscriptionToVersion2(final Object 
upgradeFromValue) {
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+
+        mockTaskManager(
+            emptyTasks,
+            emptyTasks,
+            UUID.randomUUID(),
+            builder);
+        
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG,
 upgradeFromValue));
+
+        PartitionAssignor.Subscription subscription = 
partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+        assertThat(SubscriptionInfo.decode(subscription.userData()).version(), 
equalTo(2));
+    }
+
     private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, 
Set<TopicPartition>> firstHostState) {
         final AssignmentInfo info = new 
AssignmentInfo(Collections.<TaskId>emptyList(),
                                                        Collections.<TaskId, 
Set<TopicPartition>>emptyMap(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index c1020a98ba9..c7382e7671c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -22,85 +22,70 @@
 import org.apache.kafka.streams.state.HostInfo;
 import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 public class AssignmentInfoTest {
+    private final List<TaskId> activeTasks = Arrays.asList(
+        new TaskId(0, 0),
+        new TaskId(0, 0),
+        new TaskId(0, 1), new TaskId(1, 0));
+    private final Map<TaskId, Set<TopicPartition>> standbyTasks = new 
HashMap<TaskId, Set<TopicPartition>>() {
+        {
+            put(new TaskId(1, 1),
+                Utils.mkSet(new TopicPartition("t1", 1), new 
TopicPartition("t2", 1)));
+            put(new TaskId(2, 0),
+                Utils.mkSet(new TopicPartition("t3", 0), new 
TopicPartition("t3", 0)));
+        }
+    };
+    private final Map<HostInfo, Set<TopicPartition>> globalAssignment = new 
HashMap<HostInfo, Set<TopicPartition>>() {
+        {
+            put(new HostInfo("localhost", 80),
+                Utils.mkSet(new TopicPartition("t1", 1), new 
TopicPartition("t3", 3)));
+        }
+    };
 
     @Test
-    public void testEncodeDecode() {
-        List<TaskId> activeTasks =
-                Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new 
TaskId(0, 1), new TaskId(1, 0));
-        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-
-        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new 
TopicPartition("t1", 1), new TopicPartition("t2", 1)));
-        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new 
TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+    public void shouldUseLatestSupportedVersionByDefault() {
+        final AssignmentInfo info = new AssignmentInfo(activeTasks, 
standbyTasks, globalAssignment);
+        assertEquals(AssignmentInfo.LATEST_SUPPORTED_VERSION, info.version());
+    }
 
-        AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, 
new HashMap<HostInfo, Set<TopicPartition>>());
-        AssignmentInfo decoded = AssignmentInfo.decode(info.encode());
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowForUnknownVersion1() {
+        new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment);
+    }
 
-        assertEquals(info, decoded);
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowForUnknownVersion2() {
+        new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, 
activeTasks, standbyTasks, globalAssignment);
     }
 
     @Test
-    public void shouldDecodePreviousVersion() throws IOException {
-        List<TaskId> activeTasks =
-                Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new 
TaskId(0, 1), new TaskId(1, 0));
-        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-
-        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new 
TopicPartition("t1", 1), new TopicPartition("t2", 1)));
-        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new 
TopicPartition("t3", 0), new TopicPartition("t3", 0)));
-        final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, 
standbyTasks, null);
-        final AssignmentInfo decoded = 
AssignmentInfo.decode(encodeV1(oldVersion));
-        assertEquals(oldVersion.activeTasks(), decoded.activeTasks());
-        assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks());
-        assertNull(decoded.partitionsByHost()); // should be null as wasn't in 
V1
-        assertEquals(1, decoded.version());
+    public void shouldEncodeAndDecodeVersion1() {
+        final AssignmentInfo info = new AssignmentInfo(1, activeTasks, 
standbyTasks, globalAssignment);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(1, 
AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.<HostInfo, 
Set<TopicPartition>>emptyMap());
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
 
-    /**
-     * This is a clone of what the V1 encoding did. The encode method has 
changed for V2
-     * so it is impossible to test compatibility without having this
-     */
-    private ByteBuffer encodeV1(AssignmentInfo oldVersion) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(baos);
-        // Encode version
-        out.writeInt(oldVersion.version());
-        // Encode active tasks
-        out.writeInt(oldVersion.activeTasks().size());
-        for (TaskId id : oldVersion.activeTasks()) {
-            id.writeTo(out);
-        }
-        // Encode standby tasks
-        out.writeInt(oldVersion.standbyTasks().size());
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : 
oldVersion.standbyTasks().entrySet()) {
-            TaskId id = entry.getKey();
-            id.writeTo(out);
-
-            Set<TopicPartition> partitions = entry.getValue();
-            out.writeInt(partitions.size());
-            for (TopicPartition partition : partitions) {
-                out.writeUTF(partition.topic());
-                out.writeInt(partition.partition());
-            }
-        }
-
-        out.flush();
-        out.close();
-
-        return ByteBuffer.wrap(baos.toByteArray());
+    @Test
+    public void shouldEncodeAndDecodeVersion2() {
+        final AssignmentInfo info = new AssignmentInfo(2, activeTasks, 
standbyTasks, globalAssignment);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(2, 
AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment);
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
+    }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion3() {
+        final AssignmentInfo info = new AssignmentInfo(3, activeTasks, 
standbyTasks, globalAssignment);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(3, 
AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, 
globalAssignment);
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
 
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 633285a2b4d..e98b8ce0727 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -19,81 +19,60 @@
 import org.apache.kafka.streams.processor.TaskId;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 public class SubscriptionInfoTest {
+    private final UUID processId = UUID.randomUUID();
+    private final Set<TaskId> activeTasks = new HashSet<>(Arrays.asList(
+        new TaskId(0, 0),
+        new TaskId(0, 1),
+        new TaskId(1, 0)));
+    private final Set<TaskId> standbyTasks = new HashSet<>(Arrays.asList(
+        new TaskId(1, 1),
+        new TaskId(2, 0)));
 
-    @Test
-    public void testEncodeDecode() {
-        UUID processId = UUID.randomUUID();
+    private final static String IGNORED_USER_ENDPOINT = 
"ignoredUserEndpoint:80";
 
-        Set<TaskId> activeTasks =
-                new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 
1), new TaskId(1, 0)));
-        Set<TaskId> standbyTasks =
-                new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 
0)));
+    @Test
+    public void shouldUseLatestSupportedVersionByDefault() {
+        final SubscriptionInfo info = new SubscriptionInfo(processId, 
activeTasks, standbyTasks, "localhost:80");
+        assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION, 
info.version());
+    }
 
-        SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, 
standbyTasks, null);
-        SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowForUnknownVersion1() {
+        new SubscriptionInfo(0, processId, activeTasks, standbyTasks, 
"localhost:80");
+    }
 
-        assertEquals(info, decoded);
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowForUnknownVersion2() {
+        new SubscriptionInfo(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, 
processId, activeTasks, standbyTasks, "localhost:80");
     }
 
     @Test
-    public void shouldEncodeDecodeWithUserEndPoint() {
-        SubscriptionInfo original = new SubscriptionInfo(UUID.randomUUID(),
-                Collections.singleton(new TaskId(0, 0)), 
Collections.<TaskId>emptySet(), "localhost:80");
-        SubscriptionInfo decoded = SubscriptionInfo.decode(original.encode());
-        assertEquals(original, decoded);
+    public void shouldEncodeAndDecodeVersion1() {
+        final SubscriptionInfo info = new SubscriptionInfo(1, processId, 
activeTasks, standbyTasks, IGNORED_USER_ENDPOINT);
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(1, 
SubscriptionInfo.UNKNOWN, processId, activeTasks, standbyTasks, null);
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
 
     @Test
-    public void shouldBeBackwardCompatible() {
-        UUID processId = UUID.randomUUID();
-
-        Set<TaskId> activeTasks =
-                new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 
1), new TaskId(1, 0)));
-        Set<TaskId> standbyTasks =
-                new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 
0)));
-
-        final ByteBuffer v1Encoding = encodePreviousVersion(processId, 
activeTasks, standbyTasks);
-        final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
-        assertEquals(activeTasks, decode.prevTasks());
-        assertEquals(standbyTasks, decode.standbyTasks());
-        assertEquals(processId, decode.processId());
-        assertNull(decode.userEndPoint());
+    public void shouldEncodeAndDecodeVersion2() {
+        final SubscriptionInfo info = new SubscriptionInfo(2, processId, 
activeTasks, standbyTasks, "localhost:80");
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(2, 
SubscriptionInfo.UNKNOWN, processId, activeTasks, standbyTasks, "localhost:80");
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
 
-    /**
-     * This is a clone of what the V1 encoding did. The encode method has 
changed for V2
-     * so it is impossible to test compatibility without having this
-     */
-    private ByteBuffer encodePreviousVersion(UUID processId,  Set<TaskId> 
prevTasks, Set<TaskId> standbyTasks) {
-        ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process 
id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
-        // version
-        buf.putInt(1);
-        // encode client UUID
-        buf.putLong(processId.getMostSignificantBits());
-        buf.putLong(processId.getLeastSignificantBits());
-        // encode ids of previously running tasks
-        buf.putInt(prevTasks.size());
-        for (TaskId id : prevTasks) {
-            id.writeTo(buf);
-        }
-        // encode ids of cached tasks
-        buf.putInt(standbyTasks.size());
-        for (TaskId id : standbyTasks) {
-            id.writeTo(buf);
-        }
-        buf.rewind();
-
-        return buf;
+    @Test
+    public void shouldEncodeAndDecodeVersion3() {
+        final SubscriptionInfo info = new SubscriptionInfo(3, processId, 
activeTasks, standbyTasks, "localhost:80");
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(3, 
SubscriptionInfo.LATEST_SUPPORTED_VERSION, processId, activeTasks, 
standbyTasks, "localhost:80");
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
+
 }
diff --git 
a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
 
b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 00000000000..a8796cb056a
--- /dev/null
+++ 
b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    /**
+     * This test cannot be executed, as long as Kafka 1.1.1 is not released
+     */
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires three argument 
(kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        final String kafka = args[0];
+        final String propFileName = args.length > 1 ? args[1] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v1.1)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("props=" + streamsProperties);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data 
taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + 
numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index a5be816c737..e0e445de22a 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -413,6 +413,7 @@ def __init__(self, test_context, kafka):
                                                                  
"org.apache.kafka.streams.tests.StreamsUpgradeTest",
                                                                  "")
         self.UPGRADE_FROM = None
+        self.UPGRADE_TO = None
 
     def set_version(self, kafka_streams_version):
         self.KAFKA_STREAMS_VERSION = kafka_streams_version
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index fa79d571f36..8b7d7712459 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -23,8 +23,16 @@
 import random
 import time
 
+# broker 0.10.0 is not compatible with newer Kafka Streams versions
 broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(DEV_BRANCH)]
-simple_upgrade_versions_metadata_version_2 = [str(LATEST_0_10_1), 
str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(DEV_VERSION)]
+
+metadata_1_versions = [str(LATEST_0_10_0)]
+metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
+# we can add the following versions to 
`backward_compatible_metadata_2_versions` after the corresponding
+# bug-fix release 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, and 1.1.1 are available:
+# str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), 
str(LATEST_1_1)
+backward_compatible_metadata_2_versions = []
+metadata_3_versions = [str(DEV_VERSION)]
 
 class StreamsUpgradeTest(Test):
     """
@@ -39,6 +47,7 @@ def __init__(self, test_context):
             'echo' : { 'partitions': 5 },
             'data' : { 'partitions': 5 },
         }
+        self.leader = None
 
     def perform_broker_upgrade(self, to_version):
         self.logger.info("First pass bounce - rolling broker upgrade")
@@ -114,7 +123,7 @@ def test_upgrade_downgrade_brokers(self, from_version, 
to_version):
         node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % 
self.driver.STDOUT_FILE, allow_fail=False)
         self.processor1.node.account.ssh_capture("grep 
SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
 
-    @matrix(from_version=simple_upgrade_versions_metadata_version_2, 
to_version=simple_upgrade_versions_metadata_version_2)
+    @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
     def test_simple_upgrade_downgrade(self, from_version, to_version):
         """
         Starts 3 KafkaStreams instances with <old_version>, and upgrades 
one-by-one to <new_version>
@@ -165,15 +174,12 @@ def test_simple_upgrade_downgrade(self, from_version, 
to_version):
 
         self.driver.stop()
 
-    #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test 
until Kafka 0.10.1.2 is released
-    #@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test 
until Kafka 0.10.2.2 is released
-    #@parametrize(new_version=str(LATEST_0_11_0)) we cannot run this test 
until Kafka 0.11.0.3 is released
-    #@parametrize(new_version=str(LATEST_1_0)) we cannot run this test until 
Kafka 1.0.2 is released
-    #@parametrize(new_version=str(LATEST_1_1)) we cannot run this test until 
Kafka 1.1.1 is released
-    @parametrize(new_version=str(DEV_VERSION))
-    def test_metadata_upgrade(self, new_version):
+    #@matrix(from_version=metadata_1_versions, 
to_version=backward_compatible_metadata_2_versions)
+    @matrix(from_version=metadata_1_versions, to_version=metadata_3_versions)
+    @matrix(from_version=metadata_2_versions, to_version=metadata_3_versions)
+    def test_metadata_upgrade(self, from_version, to_version):
         """
-        Starts 3 KafkaStreams instances with version 0.10.0, and upgrades 
one-by-one to <new_version>
+        Starts 3 KafkaStreams instances with version <from_version> and 
upgrades one-by-one to <to_version>
         """
 
         self.zk = ZookeeperService(self.test_context, num_nodes=1)
@@ -189,7 +195,7 @@ def test_metadata_upgrade(self, new_version):
         self.processor3 = 
StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
 
         self.driver.start()
-        self.start_all_nodes_with(str(LATEST_0_10_0))
+        self.start_all_nodes_with(from_version)
 
         self.processors = [self.processor1, self.processor2, self.processor3]
 
@@ -200,13 +206,13 @@ def test_metadata_upgrade(self, new_version):
         random.shuffle(self.processors)
         for p in self.processors:
             p.CLEAN_NODE_ENABLED = False
-            self.do_rolling_bounce(p, "0.10.0", new_version, counter)
+            self.do_rolling_bounce(p, from_version[:-2], to_version, counter)
             counter = counter + 1
 
         # second rolling bounce
         random.shuffle(self.processors)
         for p in self.processors:
-            self.do_rolling_bounce(p, None, new_version, counter)
+            self.do_rolling_bounce(p, None, to_version, counter)
             counter = counter + 1
 
         # shutdown
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 66e5fcf18aa..7823efac1d4 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -63,17 +63,17 @@ def get_version(node=None):
 DEV_BRANCH = KafkaVersion("dev")
 DEV_VERSION = KafkaVersion("1.2.0-SNAPSHOT")
 
-# 0.8.2.X versions
+# 0.8.2.x versions
 V_0_8_2_1 = KafkaVersion("0.8.2.1")
 V_0_8_2_2 = KafkaVersion("0.8.2.2")
 LATEST_0_8_2 = V_0_8_2_2
 
-# 0.9.0.X versions
+# 0.9.0.x versions
 V_0_9_0_0 = KafkaVersion("0.9.0.0")
 V_0_9_0_1 = KafkaVersion("0.9.0.1")
 LATEST_0_9 = V_0_9_0_1
 
-# 0.10.0.X versions
+# 0.10.0.x versions
 V_0_10_0_0 = KafkaVersion("0.10.0.0")
 V_0_10_0_1 = KafkaVersion("0.10.0.1")
 LATEST_0_10_0 = V_0_10_0_1


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6054
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6054
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: James Cheng
>            Assignee: Matthias J. Sax
>            Priority: Major
>              Labels: kip
>             Fix For: 1.2.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running.
> We observed the following stack trace:
> {code:java}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
>         at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
>         at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>         
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, 
> because the internal version number of the protocol changed when adding 
> Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to