[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414965#comment-16414965
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---------------------------------------

mjsax closed pull request #4761:  KAFKA-6054: Fix upgrade path from Kafka 
Streams v0.10.0
URL: https://github.com/apache/kafka/pull/4761
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index fe6aefd7321..8e2ba91bf2b 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -73,28 +73,50 @@ do
   fi
 done
 
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
-  if should_include_file "$file"; then
-    CLASSPATH="$CLASSPATH":"$file"
-  fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  clients_lib_dir=$(dirname $0)/../clients/build/libs
+  streams_lib_dir=$(dirname $0)/../streams/build/libs
+  rocksdb_lib_dir=$(dirname 
$0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+  streams_lib_dir=$clients_lib_dir
+  rocksdb_lib_dir=$streams_lib_dir
+fi
+
 
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
 do
   if should_include_file "$file"; then
     CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in 
"$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
 do
   if should_include_file "$file"; then
     CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in 
"$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  for file in 
"$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+  do
+    if should_include_file "$file"; then
+      CLASSPATH="$CLASSPATH":"$file"
+    fi
+  done
+else
+  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
+  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # 
remove last char, ie, bug-fix number
+  for file in 
"$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
+  do
+    if should_include_file "$file"; then
+      CLASSPATH="$CLASSPATH":"$file"
+    fi
+  done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
 do
   CLASSPATH="$CLASSPATH":"$file"
 done
diff --git a/build.gradle b/build.gradle
index ce4b4e44cb2..17f3e00358d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -909,6 +909,42 @@ project(':streams:examples') {
   }
 }
 
+project(':streams:upgrade-system-tests-0100') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0100
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
+project(':streams:upgrade-system-tests-0101') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0101
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
+project(':streams:upgrade-system-tests-0102') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0102"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0102
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
 project(':jmh-benchmarks') {
 
   apply plugin: 'com.github.johnrengelman.shadow'
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 7111bad6054..7102414628a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import java.util.Map;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
@@ -25,10 +27,7 @@
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import java.util.Map;
 
 /**
  * Callback handler for Sasl clients. The callbacks required for the SASL 
mechanism
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 7f2c9f6cf89..86d6d531a9c 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -27,16 +27,33 @@ <h1>Upgrade Guide &amp; API Changes</h1>
     </p>
 
     <p>
-        If you want to upgrade from 0.10.1.x to 0.10.2, see the <a 
href="/{{version}}/documentation/#upgrade_1020_streams"><b>Upgrade Section for 
0.10.2</b></a>.
+        If you want to upgrade from 0.10.1.x to 0.11.0, see the <a 
href="/{{version}}/documentation/#upgrade_1020_streams"><b>Upgrade Section for 
0.10.2</b></a>.
         It highlights incompatible changes you need to consider to upgrade 
your code and application.
-        See <a href="#streams_api_changes_0102">below</a> a complete list of 
0.10.2 API and semantical changes that allow you to advance your application 
and/or simplify your code base, including the usage of new features.
+        See below a complete list of <a 
href="#streams_api_changes_0102">0.10.2</a> and <a 
href="#streams_api_changes_0110">0.11.0</a> API and semantical changes
+        that allow you to advance your application and/or simplify your code 
base, including the usage of new features.
     </p>
 
     <p>
-        If you want to upgrade from 0.10.0.x to 0.10.1, see the <a 
href="/{{version}}/documentation/#upgrade_1010_streams"><b>Upgrade Section for 
0.10.1</b></a>.
-        It highlights incompatible changes you need to consider to upgrade 
your code and application.
-        See <a href="#streams_api_changes_0101">below</a> a complete list of 
0.10.1 API changes that allow you to advance your application and/or simplify 
your code base, including the usage of new features.
+        Upgrading from 0.10.0.x to 0.11.0.x directly is also possible.
+        Note, that a brokers must be on version 0.10.1 or higher to run a 
Kafka Streams application version 0.10.1 or higher.
+        See <a href="#streams_api_changes_0101">Streams API changes in 
0.10.1</a>, <a href="#streams_api_changes_0102">Streams API changes in 
0.10.2</a>,
+        and <a href="#streams_api_changes_0110">Streams API changes in 
0.11.0</a> for a complete list of API changes.
+        Upgrading to 0.11.0.3 requires two rolling bounces with config 
<code>upgrade.from="0.10.0"</code> set for first upgrade phase
+        (cf. <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade";>KIP-268</a>).
+        As an alternative, an offline upgrade is also possible.
     </p>
+    <ul>
+        <li> prepare your application instances for a rolling bounce and make 
sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for 
new version 0.11.0.3 </li>
+        <li> bounce each instance of your application once </li>
+        <li> prepare your newly deployed 0.11.0.3 application instances for a 
second round of rolling bounces; make sure to remove the value for config 
<code>upgrade.mode</code> </li>
+        <li> bounce each instance of your application once more to complete 
the upgrade </li>
+    </ul>
+    <p> Upgrading from 0.10.0.x to 0.11.0.0, 0.11.0.1, or 0.11.0.2 requires an 
offline upgrade (rolling bounce upgrade is not supported) </p>
+    <ul>
+        <li> stop all old (0.10.0.x) application instances </li>
+        <li> update your code and swap old code and jar file with new code and 
new jar file </li>
+        <li> restart all new (0.11.0.0, 0.11.0.1, or 0.11.0.2) application 
instances </li>
+    </ul>
 
     <h3><a id="streams_api_changes_0110" 
href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 9f0dbdf55fb..06038753189 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -64,6 +64,12 @@ <h4><a id="upgrade_11_0_0" href="#upgrade_11_0_0">Upgrading 
from 0.8.x, 0.9.x, 0
     before you switch to 0.11.0.</li>
 </ol>
 
+<h5><a id="upgrade_1103_notable" href="#upgrade_1103_notable">Notable changes 
in 0.11.0.3</a></h5>
+<ul>
+    <li> New Kafka Streams configuration parameter <code>upgrade.from</code> 
added that allows rolling bounce upgrade from version 0.10.0.x </li>
+    <li> See the <a 
href="/{{version}}/documentation/streams/upgrade-guide.html"><b>Kafka Streams 
upgrade guide</b></a> for details about this new config.
+</ul>
+
 <h5><a id="upgrade_1100_notable" href="#upgrade_1100_notable">Notable changes 
in 0.11.0.0</a></h5>
 <ul>
     <li>Unclean leader election is now disabled by default. The new default 
favors durability over availability. Users who wish to
@@ -214,14 +220,41 @@ <h5><a id="upgrade_1020_streams" 
href="#upgrade_1020_streams">Upgrading a 0.10.1
     <li> See <a 
href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API 
changes in 0.10.2</a> for more details. </li>
 </ul>
 
+<h5><a id="upgrade_1020_streams_from_0100" 
href="#upgrade_1020_streams_from_0100">Upgrading a 0.10.0 Kafka Streams 
Application</a></h5>
+<ul>
+    <li> Upgrading your Streams application from 0.10.0 to 0.10.2 does require 
a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.2 
application can only connect to 0.10.2 or 0.10.1 brokers. </li>
+    <li> There are couple of API changes, that are not backward compatible 
(cf. <a 
href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API 
changes in 0.10.2</a> for more details).
+         Thus, you need to update and recompile your code. Just swapping the 
Kafka Streams library jar file will not work and will break your application. 
</li>
+    <li> Upgrading from 0.10.0.x to 0.10.2.2 requires two rolling bounces with 
config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+         (cf. <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade";>KIP-268</a>).
+         As an alternative, an offline upgrade is also possible.
+        <ul>
+            <li> prepare your application instances for a rolling bounce and 
make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> 
for new version 0.10.2.2 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 0.10.2.2 application instances 
for a second round of rolling bounces; make sure to remove the value for config 
<code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to 
complete the upgrade </li>
+        </ul>
+    </li>
+    <li> Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an offline 
upgrade (rolling bounce upgrade is not supported)
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code 
and new jar file </li>
+            <li> restart all new (0.10.2.0 or 0.10.2.1) application instances 
</li>
+        </ul>
+    </li>
+</ul>
+
+<h5><a id="upgrade_10202_notable" href="#upgrade_10202_notable">Notable 
changes in 0.10.2.2</a></h5>
+<ul>
+    <li> New configuration parameter <code>upgrade.from</code> added that 
allows rolling bounce upgrade from version 0.10.0.x </li>
+</ul>
+
 <h5><a id="upgrade_10201_notable" href="#upgrade_10201_notable">Notable 
changes in 0.10.2.1</a></h5>
 <ul>
   <li> The default values for two configurations of the StreamsConfig class 
were changed to improve the resiliency of Kafka Streams applications. The 
internal Kafka Streams producer <code>retries</code> default value was changed 
from 0 to 10. The internal Kafka Streams consumer 
<code>max.poll.interval.ms</code>  default value was changed from 300000 to 
<code>Integer.MAX_VALUE</code>.
   </li>
 </ul>
 
-
-
 <h5><a id="upgrade_1020_notable" href="#upgrade_1020_notable">Notable changes 
in 0.10.2.0</a></h5>
 <ul>
     <li>The Java clients (producer and consumer) have acquired the ability to 
communicate with older brokers. Version 0.10.2 clients
@@ -294,6 +327,23 @@ <h5><a id="upgrade_1010_streams" 
href="#upgrade_1010_streams">Upgrading a 0.10.0
     <li> Upgrading your Streams application from 0.10.0 to 0.10.1 does require 
a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.1 
application can only connect to 0.10.1 brokers. </li>
     <li> There are couple of API changes, that are not backward compatible 
(cf. <a 
href="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API 
changes in 0.10.1</a> for more details).
          Thus, you need to update and recompile your code. Just swapping the 
Kafka Streams library jar file will not work and will break your application. 
</li>
+    <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with 
config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+         (cf. <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade";>KIP-268</a>).
+         As an alternative, an offline upgrade is also possible.
+        <ul>
+            <li> prepare your application instances for a rolling bounce and 
make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> 
for new version 0.10.1.2 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 0.10.1.2 application instances 
for a second round of rolling bounces; make sure to remove the value for config 
<code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to 
complete the upgrade </li>
+        </ul>
+    </li>
+    <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline 
upgrade (rolling bounce upgrade is not supported)
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code 
and new jar file </li>
+            <li> restart all new (0.10.1.0 or 0.10.1.1) application instances 
</li>
+        </ul>
+    </li>
 </ul>
 
 <h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes 
in 0.10.1.0</a></h5>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 5d145e17ed9..d881353703f 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -55,6 +55,9 @@ versions += [
   jackson: "2.8.5",
   jetty: "9.2.22.v20170606",
   jersey: "2.24",
+  kafka_0100: "0.10.0.1",
+  kafka_0101: "0.10.1.1",
+  kafka_0102: "0.10.2.1",
   log4j: "1.2.17",
   jopt: "5.0.3",
   junit: "4.12",
@@ -96,6 +99,9 @@ libs += [
   junit: "junit:junit:$versions.junit",
   log4j: "log4j:log4j:$versions.log4j",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
+  kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
+  kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
+  kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
   lz4: "net.jpountz.lz4:lz4:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
   powermock: "org.powermock:powermock-module-junit4:$versions.powermock",
diff --git a/settings.gradle b/settings.gradle
index f0fdf07128c..769046fe556 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 
'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 
'streams:upgrade-system-tests-0100',
+        'streams:upgrade-system-tests-0101', 
'streams:upgrade-system-tests-0102', 'log4j-appender',
         'connect:api', 'connect:transforms', 'connect:runtime', 
'connect:json', 'connect:file', 'jmh-benchmarks'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b411344aaac..d45b1357f0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -105,6 +105,11 @@
      */
     public static final String PRODUCER_PREFIX = "producer.";
 
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 0.10.0.x}.
+     */
+    public static final String UPGRADE_FROM_0100 = "0.10.0";
+
     /**
      * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG 
"processing.guarantee"} for at-least-once processing guarantees.
      */
@@ -247,6 +252,11 @@
     public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = 
"timestamp.extractor";
     private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp 
extractor class that implements the <code>TimestampExtractor</code> interface. 
This config is deprecated, use <code>" + 
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "</code> instead";
 
+    /** {@code upgrade.from} */
+    public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
+    public static final String UPGRADE_FROM_DOC = "Allows upgrading from 
version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
+        "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" 
(for upgrading from 0.10.0.x).";
+
     /**
      * {@code value.serde}
      * @deprecated Use {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG} instead.
@@ -466,6 +476,12 @@
                     null,
                     Importance.LOW,
                     TIMESTAMP_EXTRACTOR_CLASS_DOC)
+            .define(UPGRADE_FROM_CONFIG,
+                    ConfigDef.Type.STRING,
+                    null,
+                    in(null, UPGRADE_FROM_0100),
+                    Importance.LOW,
+                    UPGRADE_FROM_DOC)
             .define(VALUE_SERDE_CLASS_CONFIG,
                     Type.CLASS,
                     null,
@@ -632,6 +648,7 @@ public StreamsConfig(final Map<?, ?> props) {
         consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-consumer");
 
         // add configs required for stream partition assignor
+        consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
         consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
         consumerProps.put(REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 0a1b2ab76cb..6e2bfa67bac 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -165,6 +165,8 @@ public int compare(TopicPartition p1, TopicPartition p2) {
     private String userEndPoint;
     private int numStandbyReplicas;
 
+    private int userMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
+
     private Cluster metadataWithInternalTopics;
     private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
 
@@ -192,6 +194,12 @@ void time(final Time time) {
     public void configure(Map<String, ?> configs) {
         numStandbyReplicas = (Integer) 
configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
+        final String upgradeMode = (String) 
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+        if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
+            log.info("Downgrading metadata version from 2 to 1 for upgrade 
from 0.10.0.x.");
+            userMetadataVersion = 1;
+        }
+
         Object o = 
configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
         if (o == null) {
             KafkaException ex = new KafkaException("StreamThread is not 
specified");
@@ -251,7 +259,7 @@ public Subscription subscription(Set<String> topics) {
         final Set<TaskId> previousActiveTasks = streamThread.prevActiveTasks();
         Set<TaskId> standbyTasks = streamThread.cachedTasks();
         standbyTasks.removeAll(previousActiveTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, 
previousActiveTasks, standbyTasks, this.userEndPoint);
+        SubscriptionInfo data = new SubscriptionInfo(userMetadataVersion, 
streamThread.processId, previousActiveTasks, standbyTasks, this.userEndPoint);
 
         if (streamThread.builder.sourceTopicPattern() != null &&
             
!streamThread.builder.subscriptionUpdates().getUpdates().equals(topics)) {
@@ -295,11 +303,16 @@ private void updateSubscribedTopics(Set<String> topics) {
         // construct the client metadata from the decoded subscription info
         Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
 
+        int minUserMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) 
{
             String consumerId = entry.getKey();
             Subscription subscription = entry.getValue();
 
             SubscriptionInfo info = 
SubscriptionInfo.decode(subscription.userData());
+            final int usedVersion = info.version;
+            if (usedVersion < minUserMetadataVersion) {
+                minUserMetadataVersion = usedVersion;
+            }
 
             // create the new client metadata if necessary
             ClientMetadata clientMetadata = 
clientsMetadata.get(info.processId);
@@ -556,7 +569,7 @@ private void updateSubscribedTopics(Set<String> topics) {
                 }
 
                 // finally, encode the assignment before sending back to 
coordinator
-                assignment.put(consumer, new Assignment(activePartitions, new 
AssignmentInfo(active, standby, partitionsByHostState).encode()));
+                assignment.put(consumer, new Assignment(activePartitions, new 
AssignmentInfo(minUserMetadataVersion, active, standby, 
partitionsByHostState).encode()));
                 i++;
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 77fb58a113c..5409976d686 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -55,7 +55,7 @@ public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, 
Set<TopicPartition>>
         this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
     }
 
-    protected AssignmentInfo(int version, List<TaskId> activeTasks, 
Map<TaskId, Set<TopicPartition>> standbyTasks,
+    public AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, 
Set<TopicPartition>> standbyTasks,
                              Map<HostInfo, Set<TopicPartition>> hostState) {
         this.version = version;
         this.activeTasks = activeTasks;
@@ -154,9 +154,7 @@ public static AssignmentInfo decode(ByteBuffer data) {
                 }
             }
 
-            return new AssignmentInfo(activeTasks, standbyTasks, 
hostStateToTopicPartitions);
-
-
+            return new AssignmentInfo(version, activeTasks, standbyTasks, 
hostStateToTopicPartitions);
         } catch (IOException ex) {
             throw new TaskAssignmentException("Failed to decode 
AssignmentInfo", ex);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index f583dbafc94..00227e799b8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -31,7 +31,7 @@
 
     private static final Logger log = 
LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    private static final int CURRENT_VERSION = 2;
+    public static final int CURRENT_VERSION = 2;
 
     public final int version;
     public final UUID processId;
@@ -43,7 +43,7 @@ public SubscriptionInfo(UUID processId, Set<TaskId> 
prevTasks, Set<TaskId> stand
         this(CURRENT_VERSION, processId, prevTasks, standbyTasks, 
userEndPoint);
     }
 
-    private SubscriptionInfo(int version, UUID processId, Set<TaskId> 
prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
+    public SubscriptionInfo(int version, UUID processId, Set<TaskId> 
prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
         this.version = version;
         this.processId = processId;
         this.prevTasks = prevTasks;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3bbd69ea4d0..9998283928c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -82,7 +82,7 @@ public void shouldThrowExceptionIfBootstrapServersIsNotSet() {
     }
 
     @Test
-    public void testGetProducerConfigs() throws Exception {
+    public void testGetProducerConfigs() {
         final String clientId = "client";
         final Map<String, Object> returnedProps = 
streamsConfig.getProducerConfigs(clientId);
         assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), 
clientId + "-producer");
@@ -91,7 +91,7 @@ public void testGetProducerConfigs() throws Exception {
     }
 
     @Test
-    public void testGetConsumerConfigs() throws Exception {
+    public void testGetConsumerConfigs() {
         final String groupId = "example-application";
         final String clientId = "client";
         final Map<String, Object> returnedProps = 
streamsConfig.getConsumerConfigs(null, groupId, clientId);
@@ -102,7 +102,7 @@ public void testGetConsumerConfigs() throws Exception {
     }
 
     @Test
-    public void testGetRestoreConsumerConfigs() throws Exception {
+    public void testGetRestoreConsumerConfigs() {
         final String clientId = "client";
         final Map<String, Object> returnedProps = 
streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), 
clientId + "-restore-consumer");
@@ -143,7 +143,7 @@ public void shouldSupportMultipleBootstrapServers() {
     }
 
     @Test
-    public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+    public void shouldSupportPrefixedConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), 
"earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 
1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -153,7 +153,7 @@ public void shouldSupportPrefixedConsumerConfigs() throws 
Exception {
     }
 
     @Test
-    public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception 
{
+    public void shouldSupportPrefixedRestoreConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), 
"earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 
1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -163,7 +163,7 @@ public void shouldSupportPrefixedRestoreConsumerConfigs() 
throws Exception {
     }
 
     @Test
-    public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws 
Exception {
+    public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
@@ -171,7 +171,7 @@ public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() thro
     }
 
     @Test
-    public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws 
Exception {
+    public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> consumerConfigs = 
streamsConfig.getRestoreConsumerConfigs("clientId");
@@ -179,7 +179,7 @@ public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig
     }
 
     @Test
-    public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws 
Exception {
+    public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(producerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
@@ -188,7 +188,7 @@ public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() thro
 
 
     @Test
-    public void shouldSupportPrefixedProducerConfigs() throws Exception {
+    public void shouldSupportPrefixedProducerConfigs() {
         props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
         props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 
1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -198,7 +198,7 @@ public void shouldSupportPrefixedProducerConfigs() throws 
Exception {
     }
 
     @Test
-    public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+    public void shouldBeSupportNonPrefixedConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -208,7 +208,7 @@ public void shouldBeSupportNonPrefixedConsumerConfigs() 
throws Exception {
     }
 
     @Test
-    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws 
Exception {
+    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -218,7 +218,7 @@ public void 
shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception
     }
 
     @Test
-    public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+    public void shouldSupportNonPrefixedProducerConfigs() {
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -227,24 +227,22 @@ public void shouldSupportNonPrefixedProducerConfigs() 
throws Exception {
         assertEquals(1, 
configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
-
-
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws 
Exception {
+    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.defaultKeySerde();
     }
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws 
Exception {
+    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.defaultValueSerde();
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception 
{
+    public void shouldOverrideStreamsDefaultConsumerConfigs() {
         
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest");
         
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 
"10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -254,7 +252,7 @@ public void shouldOverrideStreamsDefaultConsumerConfigs() 
throws Exception {
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception 
{
+    public void shouldOverrideStreamsDefaultProducerConfigs() {
         
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 
"10000");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
@@ -262,7 +260,7 @@ public void shouldOverrideStreamsDefaultProducerConfigs() 
throws Exception {
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() 
throws Exception {
+    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() 
{
         
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest");
         
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 
"10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -272,21 +270,21 @@ public void 
shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throw
     }
 
     @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws 
Exception {
+    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() {
         
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
 "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.getConsumerConfigs(null, "a", "b");
     }
 
     @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() 
throws Exception {
+    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() {
         
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
 "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.getRestoreConsumerConfigs("client");
     }
 
     @Test
-    public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() 
throws Exception {
+    public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
         assertThat(consumerConfigs.get("internal.leave.group.on.close"), 
CoreMatchers.<Object>equalTo(false));
@@ -395,6 +393,7 @@ public void 
shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled() {
         
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), 
equalTo(commitIntervalMs));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldBeBackwardsCompatibleWithDeprecatedConfigs() {
         final Properties props = minimalStreamsConfig();
@@ -429,6 +428,7 @@ public void shouldUseCorrectDefaultsWhenNoneSpecified() {
         assertTrue(config.defaultTimestampExtractor() instanceof 
FailOnInvalidTimestamp);
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
         final Properties props = minimalStreamsConfig();
@@ -442,6 +442,7 @@ public void 
shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldSpecifyCorrectKeySerdeClassOnError() {
         final Properties props = minimalStreamsConfig();
@@ -455,6 +456,7 @@ public void shouldSpecifyCorrectKeySerdeClassOnError() {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
         final Properties props = minimalStreamsConfig();
@@ -468,6 +470,7 @@ public void 
shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldSpecifyCorrectValueSerdeClassOnError() {
         final Properties props = minimalStreamsConfig();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 372b89c048e..bb4b5758cfb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -51,7 +52,6 @@
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
-import kafka.utils.MockTime;
 import org.junit.experimental.categories.Category;
 
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -315,6 +315,4 @@ private void startStreams() {
 
     }
 
-
-
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 98cd20a3527..a29380fecd0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -135,7 +135,7 @@ public void setUp() {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testSubscription() throws Exception {
+    public void testSubscription() {
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source1", "source2");
@@ -187,7 +187,7 @@ public void testSubscription() throws Exception {
     }
 
     @Test
-    public void testAssignBasic() throws Exception {
+    public void testAssignBasic() {
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source1", "source2");
@@ -239,12 +239,10 @@ public void testAssignBasic() throws Exception {
         assertEquals(Utils.mkSet(t1p2, t2p2), new 
HashSet<>(assignments.get("consumer20").partitions()));
 
         // check assignment info
-
-        Set<TaskId> allActiveTasks = new HashSet<>();
+        AssignmentInfo info10 = checkAssignment(allTopics, 
assignments.get("consumer10"));
 
         // the first consumer
-        AssignmentInfo info10 = checkAssignment(allTopics, 
assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
 
         // the second consumer
         AssignmentInfo info11 = checkAssignment(allTopics, 
assignments.get("consumer11"));
@@ -264,7 +262,7 @@ public void testAssignBasic() throws Exception {
     }
 
     @Test
-    public void testAssignWithPartialTopology() throws Exception {
+    public void testAssignWithPartialTopology() {
         Properties props = configProps();
         props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, 
SingleGroupPartitionGrouperStub.class);
         StreamsConfig config = new StreamsConfig(props);
@@ -306,9 +304,8 @@ public void testAssignWithPartialTopology() throws 
Exception {
         Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, subscriptions);
 
         // check assignment info
-        Set<TaskId> allActiveTasks = new HashSet<>();
         AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), 
assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -316,7 +313,7 @@ public void testAssignWithPartialTopology() throws 
Exception {
 
 
     @Test
-    public void testAssignEmptyMetadata() throws Exception {
+    public void testAssignEmptyMetadata() {
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source1", "source2");
@@ -359,9 +356,8 @@ public void testAssignEmptyMetadata() throws Exception {
             new HashSet<>(assignments.get("consumer10").partitions()));
 
         // check assignment info
-        Set<TaskId> allActiveTasks = new HashSet<>();
         AssignmentInfo info10 = 
checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
 
         assertEquals(0, allActiveTasks.size());
         assertEquals(Collections.<TaskId>emptySet(), new 
HashSet<>(allActiveTasks));
@@ -384,7 +380,7 @@ public void testAssignEmptyMetadata() throws Exception {
     }
 
     @Test
-    public void testAssignWithNewTasks() throws Exception {
+    public void testAssignWithNewTasks() {
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addSource("source3", "topic3");
@@ -430,13 +426,9 @@ public void testAssignWithNewTasks() throws Exception {
         // check assigned partitions: since there is no previous task for 
topic 3 it will be assigned randomly so we cannot check exact match
         // also note that previously assigned partitions / tasks may not stay 
on the previous host since we may assign the new task first and
         // then later ones will be re-assigned to other hosts due to load 
balancing
-        Set<TaskId> allActiveTasks = new HashSet<>();
-        Set<TopicPartition> allPartitions = new HashSet<>();
-        AssignmentInfo info;
-
-        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        allActiveTasks.addAll(info.activeTasks);
-        allPartitions.addAll(assignments.get("consumer10").partitions());
+        AssignmentInfo info = 
AssignmentInfo.decode(assignments.get("consumer10").userData());
+        Set<TaskId> allActiveTasks = new HashSet<>(info.activeTasks);
+        Set<TopicPartition> allPartitions = new 
HashSet<>(assignments.get("consumer10").partitions());
 
         info = AssignmentInfo.decode(assignments.get("consumer11").userData());
         allActiveTasks.addAll(info.activeTasks);
@@ -451,7 +443,7 @@ public void testAssignWithNewTasks() throws Exception {
     }
 
     @Test
-    public void testAssignWithStates() throws Exception {
+    public void testAssignWithStates() {
         String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addSource("source1", "topic1");
@@ -551,7 +543,7 @@ public void testAssignWithStates() throws Exception {
     }
 
     @Test
-    public void testAssignWithStandbyReplicas() throws Exception {
+    public void testAssignWithStandbyReplicas() {
         Properties props = configProps();
         props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         StreamsConfig config = new StreamsConfig(props);
@@ -600,13 +592,10 @@ public void testAssignWithStandbyReplicas() throws 
Exception {
 
         Map<String, PartitionAssignor.Assignment> assignments = 
partitionAssignor.assign(metadata, subscriptions);
 
-        Set<TaskId> allActiveTasks = new HashSet<>();
-        Set<TaskId> allStandbyTasks = new HashSet<>();
-
         // the first consumer
         AssignmentInfo info10 = checkAssignment(allTopics, 
assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
-        allStandbyTasks.addAll(info10.standbyTasks.keySet());
+        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
+        Set<TaskId> allStandbyTasks = new 
HashSet<>(info10.standbyTasks.keySet());
 
         // the second consumer
         AssignmentInfo info11 = checkAssignment(allTopics, 
assignments.get("consumer11"));
@@ -634,7 +623,7 @@ public void testAssignWithStandbyReplicas() throws 
Exception {
     }
 
     @Test
-    public void testOnAssignment() throws Exception {
+    public void testOnAssignment() {
         TopicPartition t2p3 = new TopicPartition("topic2", 3);
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -677,7 +666,7 @@ public void testOnAssignment() throws Exception {
     }
 
     @Test
-    public void testAssignWithInternalTopics() throws Exception {
+    public void testAssignWithInternalTopics() {
         String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
@@ -722,7 +711,7 @@ public void testAssignWithInternalTopics() throws Exception 
{
     }
 
     @Test
-    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() 
throws Exception {
+    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() 
{
         String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
@@ -760,7 +749,7 @@ public void 
testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throw
     }
 
     @Test
-    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+    public void shouldAddUserDefinedEndPointToSubscription() {
         final Properties properties = configProps();
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, 
"localhost:8080");
         final StreamsConfig config = new StreamsConfig(properties);
@@ -773,8 +762,8 @@ public void shouldAddUserDefinedEndPointToSubscription() 
throws Exception {
         final UUID uuid1 = UUID.randomUUID();
         final String client1 = "client1";
 
-        final StreamThread streamThread = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, 
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                           0, stateDirectory);
+        final StreamThread streamThread = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client1,
+            uuid1, new Metrics(), Time.SYSTEM, new 
StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0, 
stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, 
applicationId, client1));
         final PartitionAssignor.Subscription subscription = 
partitionAssignor.subscription(Utils.mkSet("input"));
@@ -783,7 +772,82 @@ public void shouldAddUserDefinedEndPointToSubscription() 
throws Exception {
     }
 
     @Test
-    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+    public void 
shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new 
HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+        subscriptions.put(
+            "consumer1",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, 
emptyTasks, null).encode()
+            )
+        );
+        subscriptions.put(
+            "consumer2",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, 
emptyTasks, null).encode()
+            )
+        );
+
+        final StreamPartitionAssignor partitionAssignor = new 
StreamPartitionAssignor();
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        final TopologyBuilder builder = new TopologyBuilder();
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "appId",
+            "clientId",
+            UUID.randomUUID(),
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
+
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, 
"test", "clientId"));
+        final Map<String, PartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, subscriptions);
+
+        assertEquals(2, assignment.size());
+        assertEquals(1, 
AssignmentInfo.decode(assignment.get("consumer1").userData()).version);
+        assertEquals(1, 
AssignmentInfo.decode(assignment.get("consumer2").userData()).version);
+    }
+
+    @Test
+    public void shouldDownGradeSubscription() {
+        final Properties properties = configProps();
+        properties.put(StreamsConfig.UPGRADE_FROM_CONFIG, 
StreamsConfig.UPGRADE_FROM_0100);
+        StreamsConfig config = new StreamsConfig(properties);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+
+        String clientId = "client-id";
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "appId",
+            "clientId",
+            UUID.randomUUID(),
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
+
+        StreamPartitionAssignor partitionAssignor = new 
StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, 
"test", clientId));
+
+        PartitionAssignor.Subscription subscription = 
partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+        assertEquals(1, 
SubscriptionInfo.decode(subscription.userData()).version);
+    }
+
+    @Test
+    public void shouldMapUserEndPointToTopicPartitions() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost:8080";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -831,7 +895,7 @@ public void shouldMapUserEndPointToTopicPartitions() throws 
Exception {
     }
 
     @Test
-    public void 
shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws 
Exception {
+    public void 
shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -865,7 +929,7 @@ public void 
shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() thr
     }
 
     @Test
-    public void 
shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws 
Exception {
+    public void 
shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost:j87yhk";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -897,7 +961,7 @@ public void 
shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() th
     }
 
     @Test
-    public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws 
Exception {
+    public void shouldExposeHostStateToTopicPartitionsOnAssignment() {
         List<TopicPartition> topic = Collections.singletonList(new 
TopicPartition("topic", 0));
         final Map<HostInfo, Set<TopicPartition>> hostState =
                 Collections.singletonMap(new HostInfo("localhost", 80),
@@ -910,7 +974,7 @@ public void 
shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exceptio
     }
 
     @Test
-    public void shouldSetClusterMetadataOnAssignment() throws Exception {
+    public void shouldSetClusterMetadataOnAssignment() {
         final List<TopicPartition> topic = Collections.singletonList(new 
TopicPartition("topic", 0));
         final Map<HostInfo, Set<TopicPartition>> hostState =
                 Collections.singletonMap(new HostInfo("localhost", 80),
@@ -930,7 +994,7 @@ public void shouldSetClusterMetadataOnAssignment() throws 
Exception {
     }
 
     @Test
-    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws 
Exception {
+    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() {
         final Cluster cluster = partitionAssignor.clusterMetadata();
         assertNotNull(cluster);
     }
@@ -1039,11 +1103,11 @@ public Object apply(Object value1, Object value2) {
             new TopicPartition(applicationId + "-count-repartition", 1),
             new TopicPartition(applicationId + "-count-repartition", 2)
         );
-        assertThat(new HashSet(assignment.get(client).partitions()), 
equalTo(new HashSet(expectedAssignment)));
+        assertThat(new HashSet<>(assignment.get(client).partitions()), 
equalTo(new HashSet<>(expectedAssignment)));
     }
 
     @Test
-    public void shouldUpdatePartitionHostInfoMapOnAssignment() throws 
Exception {
+    public void shouldUpdatePartitionHostInfoMapOnAssignment() {
         final TopicPartition partitionOne = new TopicPartition("topic", 1);
         final TopicPartition partitionTwo = new TopicPartition("topic", 2);
         final Map<HostInfo, Set<TopicPartition>> firstHostState = 
Collections.singletonMap(
@@ -1060,7 +1124,7 @@ public void 
shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception {
     }
 
     @Test
-    public void shouldUpdateClusterMetadataOnAssignment() throws Exception {
+    public void shouldUpdateClusterMetadataOnAssignment() {
         final TopicPartition topicOne = new TopicPartition("topic", 1);
         final TopicPartition topicTwo = new TopicPartition("topic2", 2);
         final Map<HostInfo, Set<TopicPartition>> firstHostState = 
Collections.singletonMap(
@@ -1076,7 +1140,7 @@ public void shouldUpdateClusterMetadataOnAssignment() 
throws Exception {
     }
 
     @Test
-    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws 
Exception {
+    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
         final Properties props = configProps();
         props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         final StreamsConfig config = new StreamsConfig(props);
@@ -1135,12 +1199,12 @@ public void 
shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Except
     }
 
     @Test(expected = KafkaException.class)
-    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws 
Exception {
+    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() {
         
partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
 1));
     }
 
     @Test(expected = KafkaException.class)
-    public void 
shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() throws 
Exception {
+    public void 
shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() {
         final Map<String, Object> config = new HashMap<>();
         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am 
not a stream thread");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index 9473a4027c4..361dde87776 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -64,10 +64,9 @@ public void shouldDecodePreviousVersion() throws Exception {
         assertEquals(oldVersion.activeTasks, decoded.activeTasks);
         assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
         assertEquals(0, decoded.partitionsByHost.size()); // should be empty 
as wasn't in V1
-        assertEquals(2, decoded.version); // automatically upgraded to v2 on 
decode;
+        assertEquals(1, decoded.version);
     }
 
-
     /**
      * This is a clone of what the V1 encoding did. The encode method has 
changed for V2
      * so it is impossible to test compatibility without having this
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 11e1ae86fc2..303061541f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -77,7 +77,6 @@ int next() {
     // This main() is not used by the system test. It is intended to be used 
for local debugging.
     public static void main(String[] args) throws Exception {
         final String kafka = "localhost:9092";
-        final String zookeeper = "localhost:2181";
         final File stateDir = TestUtils.tempDirectory();
 
         final int numKeys = 20;
@@ -131,42 +130,50 @@ public void run() {
     }
 
     public static Map<String, Set<Integer>> generate(String kafka, final int 
numKeys, final int maxRecordsPerKey) throws Exception {
+        return generate(kafka, numKeys, maxRecordsPerKey, true);
+    }
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int 
maxRecordsPerKey,
+                                                     final boolean 
autoTerminate) throws Exception {
         final Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
-        // the next 4 config values make sure that all records are produced 
with no loss and
-        // no duplicates
+        // the next 2 config values make sure that all records are produced 
with no loss and no duplicates
         producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
 
-        KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+        final KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
 
         int numRecordsProduced = 0;
 
-        Map<String, Set<Integer>> allData = new HashMap<>();
-        ValueList[] data = new ValueList[numKeys];
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
         for (int i = 0; i < numKeys; i++) {
             data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
             allData.put(data[i].key, new HashSet<Integer>());
         }
-        Random rand = new Random();
+        final Random rand = new Random();
 
-        int remaining = data.length;
+        int remaining = 1; // dummy value must be positive if <autoTerminate> 
is false
+        if (autoTerminate) {
+            remaining = data.length;
+        }
 
         while (remaining > 0) {
-            int index = rand.nextInt(remaining);
-            String key = data[index].key;
+            final int index = autoTerminate ? rand.nextInt(remaining) : 
rand.nextInt(numKeys);
+            final String key = data[index].key;
             int value = data[index].next();
 
-            if (value < 0) {
+            if (autoTerminate && value < 0) {
                 remaining--;
                 data[index] = data[remaining];
             } else {
 
-                ProducerRecord<byte[], byte[]> record =
-                        new ProducerRecord<>("data", 
stringSerde.serializer().serialize("", key), 
intSerde.serializer().serialize("", value));
+                final ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>("data", 
stringSerde.serializer().serialize("", key), 
intSerde.serializer().serialize("", value));
 
                 producer.send(record, new Callback() {
                     @Override
@@ -178,11 +185,12 @@ public void onCompletion(final RecordMetadata metadata, 
final Exception exceptio
                     }
                 });
 
-
                 numRecordsProduced++;
                 allData.get(key).add(value);
-                if (numRecordsProduced % 100 == 0)
+
+                if (numRecordsProduced % 100 == 0) {
                     System.out.println(numRecordsProduced + " records 
produced");
+                }
                 Utils.sleep(2);
 
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 150ec7d0c26..11845b4024c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -44,20 +44,15 @@
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
-                    private ProcessorContext context;
 
                     @Override
                     public void init(final ProcessorContext context) {
                         System.out.println("initializing processor: topic=" + 
topic + " taskId=" + context.taskId());
                         numRecordsProcessed = 0;
-                        this.context = context;
                     }
 
                     @Override
                     public void process(final Object key, final Object value) {
-                        if (printOffset) {
-                            System.out.println(">>> " + context.offset());
-                        }
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
                             System.out.println("processed " + 
numRecordsProcessed + " records from topic=" + topic);
@@ -65,10 +60,10 @@ public void process(final Object key, final Object value) {
                     }
 
                     @Override
-                    public void punctuate(final long timestamp) { }
+                    public void punctuate(final long timestamp) {}
 
                     @Override
-                    public void close() { }
+                    public void close() {}
                 };
             }
         };
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index 244aa8eef6e..699aaeba287 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -23,7 +23,7 @@
 public class StreamsSmokeTest {
 
     /**
-     *  args ::= command kafka zookeeper stateDir
+     *  args ::= command kafka zookeeper stateDir disableAutoTerminate
      *  command := "run" | "process"
      *
      * @param args
@@ -32,11 +32,13 @@ public static void main(String[] args) throws Exception {
         String kafka = args[0];
         String stateDir = args.length > 1 ? args[1] : null;
         String command = args.length > 2 ? args[2] : null;
+        boolean disableAutoTerminate = args.length > 3;
 
-        System.out.println("StreamsTest instance started");
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
         System.out.println("command=" + command);
         System.out.println("kafka=" + kafka);
         System.out.println("stateDir=" + stateDir);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
 
         switch (command) {
             case "standalone":
@@ -46,8 +48,12 @@ public static void main(String[] args) throws Exception {
                 // this starts the driver (data generation and result 
verification)
                 final int numKeys = 10;
                 final int maxRecordsPerKey = 500;
-                Map<String, Set<Integer>> allData = 
SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
-                SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                if (disableAutoTerminate) {
+                    SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, 
false);
+                } else {
+                    Map<String, Set<Integer>> allData = 
SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
                 break;
             case "process":
                 // this starts a KafkaStreams client
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 00000000000..0ee47e416ff
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires two argument 
(kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " 
provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        final String kafka = args[0];
+        final String stateDir = args[1];
+        final String upgradeFrom = args.length > 2 ? args[2] : null;
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest 
trunk)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("stateDir=" + stateDir);
+        System.out.println("upgradeFrom=" + upgradeFrom);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(SmokeTestUtil.printProcessorSupplier("data"));
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        if (upgradeFrom != null) {
+            config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
+        }
+
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+}
diff --git 
a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
 
b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 00000000000..72d7f5a7b04
--- /dev/null
+++ 
b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument 
(kafka-url, zookeeper-url, state-dir) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        final String kafka = args[0];
+        final String zookeeper = args[1];
+        final String stateDir = args[2];
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v0.10.0)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("stateDir=" + stateDir);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data 
taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + 
numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git 
a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
 
b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 00000000000..eebd0fab83c
--- /dev/null
+++ 
b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    /**
+     * This test cannot be run executed, as long as Kafka 0.10.1.2 is not 
released
+     */
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument 
(kafka-url, zookeeper-url, state-dir, [upgradeFrom: optional]) but only " + 
args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        final String kafka = args[0];
+        final String zookeeper = args[1];
+        final String stateDir = args[2];
+        final String upgradeFrom = args.length > 3 ? args[3] : null;
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v0.10.1)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("stateDir=" + stateDir);
+        System.out.println("upgradeFrom=" + upgradeFrom);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        if (upgradeFrom != null) {
+            // TODO: because Kafka 0.10.1.2 is not released yet, thus 
`UPGRADE_FROM_CONFIG` is not available yet
+            //config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, 
upgradeFrom);
+            config.setProperty("upgrade.from", upgradeFrom);
+        }
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data 
taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + 
numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git 
a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
 
b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 00000000000..18240f04ff1
--- /dev/null
+++ 
b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    /**
+     * This test cannot be run executed, as long as Kafka 0.10.2.2 is not 
released
+     */
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires three argument 
(kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " 
provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        final String kafka = args[0];
+        final String stateDir = args[1];
+        final String upgradeFrom = args.length > 2 ? args[2] : null;
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v0.10.2)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("stateDir=" + stateDir);
+        System.out.println("upgradeFrom=" + upgradeFrom);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        if (upgradeFrom != null) {
+            // TODO: because Kafka 0.10.2.2 is not released yet, thus 
`UPGRADE_FROM_CONFIG` is not available yet
+            //config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, 
upgradeFrom);
+            config.setProperty("upgrade.from", upgradeFrom);
+        }
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data 
taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + 
numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index e6f692b171d..eeb16816367 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -20,6 +20,7 @@
 from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
 
 
 class StreamsTestBaseService(KafkaPathResolverMixin, Service):
@@ -33,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
     LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
 
+    CLEAN_NODE_ENABLED = True
+
     logs = {
         "streams_log": {
             "path": LOG_FILE,
@@ -43,6 +46,114 @@ class StreamsTestBaseService(KafkaPathResolverMixin, 
Service):
         "streams_stderr": {
             "path": STDERR_FILE,
             "collect_default": True},
+        "streams_log.0-1": {
+            "path": LOG_FILE + ".0-1",
+            "collect_default": True},
+        "streams_stdout.0-1": {
+            "path": STDOUT_FILE + ".0-1",
+            "collect_default": True},
+        "streams_stderr.0-1": {
+            "path": STDERR_FILE + ".0-1",
+            "collect_default": True},
+        "streams_log.0-2": {
+            "path": LOG_FILE + ".0-2",
+            "collect_default": True},
+        "streams_stdout.0-2": {
+            "path": STDOUT_FILE + ".0-2",
+            "collect_default": True},
+        "streams_stderr.0-2": {
+            "path": STDERR_FILE + ".0-2",
+            "collect_default": True},
+        "streams_log.0-3": {
+            "path": LOG_FILE + ".0-3",
+            "collect_default": True},
+        "streams_stdout.0-3": {
+            "path": STDOUT_FILE + ".0-3",
+            "collect_default": True},
+        "streams_stderr.0-3": {
+            "path": STDERR_FILE + ".0-3",
+            "collect_default": True},
+        "streams_log.0-4": {
+            "path": LOG_FILE + ".0-4",
+            "collect_default": True},
+        "streams_stdout.0-4": {
+            "path": STDOUT_FILE + ".0-4",
+            "collect_default": True},
+        "streams_stderr.0-4": {
+            "path": STDERR_FILE + ".0-4",
+            "collect_default": True},
+        "streams_log.0-5": {
+            "path": LOG_FILE + ".0-5",
+            "collect_default": True},
+        "streams_stdout.0-5": {
+            "path": STDOUT_FILE + ".0-5",
+            "collect_default": True},
+        "streams_stderr.0-5": {
+            "path": STDERR_FILE + ".0-5",
+            "collect_default": True},
+        "streams_log.0-6": {
+            "path": LOG_FILE + ".0-6",
+            "collect_default": True},
+        "streams_stdout.0-6": {
+            "path": STDOUT_FILE + ".0-6",
+            "collect_default": True},
+        "streams_stderr.0-6": {
+            "path": STDERR_FILE + ".0-6",
+            "collect_default": True},
+        "streams_log.1-1": {
+            "path": LOG_FILE + ".1-1",
+            "collect_default": True},
+        "streams_stdout.1-1": {
+            "path": STDOUT_FILE + ".1-1",
+            "collect_default": True},
+        "streams_stderr.1-1": {
+            "path": STDERR_FILE + ".1-1",
+            "collect_default": True},
+        "streams_log.1-2": {
+            "path": LOG_FILE + ".1-2",
+            "collect_default": True},
+        "streams_stdout.1-2": {
+            "path": STDOUT_FILE + ".1-2",
+            "collect_default": True},
+        "streams_stderr.1-2": {
+            "path": STDERR_FILE + ".1-2",
+            "collect_default": True},
+        "streams_log.1-3": {
+            "path": LOG_FILE + ".1-3",
+            "collect_default": True},
+        "streams_stdout.1-3": {
+            "path": STDOUT_FILE + ".1-3",
+            "collect_default": True},
+        "streams_stderr.1-3": {
+            "path": STDERR_FILE + ".1-3",
+            "collect_default": True},
+        "streams_log.1-4": {
+            "path": LOG_FILE + ".1-4",
+            "collect_default": True},
+        "streams_stdout.1-4": {
+            "path": STDOUT_FILE + ".1-4",
+            "collect_default": True},
+        "streams_stderr.1-4": {
+            "path": STDERR_FILE + ".1-4",
+            "collect_default": True},
+        "streams_log.1-5": {
+            "path": LOG_FILE + ".1-5",
+            "collect_default": True},
+        "streams_stdout.1-5": {
+            "path": STDOUT_FILE + ".1-5",
+            "collect_default": True},
+        "streams_stderr.1-5": {
+            "path": STDERR_FILE + ".1-5",
+            "collect_default": True},
+        "streams_log.1-6": {
+            "path": LOG_FILE + ".1-6",
+            "collect_default": True},
+        "streams_stdout.1-6": {
+            "path": STDOUT_FILE + ".1-6",
+            "collect_default": True},
+        "streams_stderr.1-6": {
+            "path": STDERR_FILE + ".1-6",
+            "collect_default": True},
     }
 
     def __init__(self, test_context, kafka, streams_class_name, 
user_test_args, user_test_args1=None, user_test_args2=None):
@@ -107,7 +218,8 @@ def wait_node(self, node, timeout_sec=None):
 
     def clean_node(self, node):
         node.account.kill_process("streams", clean_shutdown=False, 
allow_fail=True)
-        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+        if self.CLEAN_NODE_ENABLED:
+            node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, 
allow_fail=False)
 
     def start_cmd(self, node):
         args = self.args.copy()
@@ -163,7 +275,28 @@ def __init__(self, test_context, kafka, command):
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsSmokeTestDriverService, self).__init__(test_context, 
kafka, "run")
+        self.DISABLE_AUTO_TERMINATE = ""
+
+    def disable_auto_terminate(self):
+        self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
 
+        cmd = "( export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true %(kafka_run_class)s 
%(streams_class_name)s " \
+              " %(kafka)s %(state_dir)s %(user_test_args)s 
%(disable_auto_terminate)s" \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" 
% args
+
+        return cmd
 
 class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
@@ -206,3 +339,41 @@ def __init__(self, test_context, kafka, eosEnabled):
                                                                 kafka,
                                                                 
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
                                                                 eosEnabled)
+
+class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
+                                                                 kafka,
+                                                                 
"org.apache.kafka.streams.tests.StreamsUpgradeTest",
+                                                                 "")
+        self.UPGRADE_FROM = ""
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or 
self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
+            args['zk'] = self.kafka.zk.connect_setting()
+        else:
+            args['zk'] = ""
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['version'] = self.KAFKA_STREAMS_VERSION
+        args['upgrade_from'] = self.UPGRADE_FROM
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true 
UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
+              " %(kafka_run_class)s %(streams_class_name)s " \
+              " %(kafka)s %(zk)s %(state_dir)s %(user_test_args)s 
%(upgrade_from)s" \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" 
% args
+
+        return cmd
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
new file mode 100644
index 00000000000..7aa2de67d53
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -0,0 +1,246 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsUpgradeTestJobRunnerService
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
DEV_VERSION
+import random
+
+class StreamsUpgradeTest(KafkaTest):
+    """
+    Test upgrading Kafka Streams (all version combination)
+    If metadata was changes, upgrade is more difficult
+    Metadata version was bumped in 0.10.1.0
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context, num_zk=1, 
num_brokers=1, topics={
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 }
+        })
+
+        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+        self.driver.disable_auto_terminate()
+        self.processor1 = StreamsUpgradeTestJobRunnerService(test_context, 
self.kafka)
+        self.processor2 = StreamsUpgradeTestJobRunnerService(test_context, 
self.kafka)
+        self.processor3 = StreamsUpgradeTestJobRunnerService(test_context, 
self.kafka)
+
+    @parametrize(old_version=str(LATEST_0_10_1), 
new_version=str(LATEST_0_10_2))
+    @parametrize(old_version=str(LATEST_0_10_1), new_version=str(DEV_VERSION))
+    @parametrize(old_version=str(LATEST_0_10_2), new_version=str(DEV_VERSION))
+    def test_simple_upgrade(self, old_version, new_version):
+        """
+        Starts 3 KafkaStreams instances with <old_version>, and upgrades 
one-by-one to <new_verion>
+        """
+
+        self.driver.start()
+        self.start_all_nodes_with(old_version)
+
+        self.processors = [self.processor1, self.processor2, self.processor3]
+
+        counter = 1
+        random.seed()
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+            self.do_rolling_bounce(p, "", new_version, counter)
+            counter = counter + 1
+
+        # shutdown
+        self.driver.stop()
+        self.driver.wait()
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            node = p.node
+            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+                p.stop()
+                monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 
'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+        self.driver.stop()
+
+    #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test 
until Kafka 0.10.1.2 is released
+    #@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test 
until Kafka 0.10.2.2 is released
+    @parametrize(new_version=str(DEV_VERSION))
+    def test_metadata_upgrade(self, new_version):
+        """
+        Starts 3 KafkaStreams instances with version 0.10.0, and upgrades 
one-by-one to <new_version>
+        """
+
+        self.driver.start()
+        self.start_all_nodes_with(str(LATEST_0_10_0))
+
+        self.processors = [self.processor1, self.processor2, self.processor3]
+
+        counter = 1
+        random.seed()
+
+        # first rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+            self.do_rolling_bounce(p, "0.10.0", new_version, counter)
+            counter = counter + 1
+
+        # second rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            self.do_rolling_bounce(p, "", new_version, counter)
+            counter = counter + 1
+
+        # shutdown
+        self.driver.stop()
+        self.driver.wait()
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            node = p.node
+            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+                p.stop()
+                monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 
'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+        self.driver.stop()
+
+    def start_all_nodes_with(self, version):
+        # start first with <version>
+        self.prepare_for(self.processor1, version)
+        node1 = self.processor1.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
+            with node1.account.monitor_log(self.processor1.LOG_FILE) as 
log_monitor:
+                self.processor1.start()
+                log_monitor.wait_until("Kafka version : " + version,
+                                       timeout_sec=60,
+                                       err_msg="Could not detect Kafka Streams 
version " + version + " " + str(node1.account))
+                monitor.wait_until("processed 100 records from topic",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'processed 100 
records from topic' on" + str(node1.account))
+
+        # start second with <version>
+        self.prepare_for(self.processor2, version)
+        node2 = self.processor2.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as 
first_monitor:
+            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as 
second_monitor:
+                with node2.account.monitor_log(self.processor2.LOG_FILE) as 
log_monitor:
+                    self.processor2.start()
+                    log_monitor.wait_until("Kafka version : " + version,
+                                           timeout_sec=60,
+                                           err_msg="Could not detect Kafka 
Streams version " + version + " " + str(node2.account))
+                    first_monitor.wait_until("processed 100 records from 
topic",
+                                             timeout_sec=60,
+                                             err_msg="Never saw output 
'processed 100 records from topic' on" + str(node1.account))
+                    second_monitor.wait_until("processed 100 records from 
topic",
+                                              timeout_sec=60,
+                                              err_msg="Never saw output 
'processed 100 records from topic' on" + str(node2.account))
+
+        # start third with <version>
+        self.prepare_for(self.processor3, version)
+        node3 = self.processor3.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as 
first_monitor:
+            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as 
second_monitor:
+                with node3.account.monitor_log(self.processor3.STDOUT_FILE) as 
third_monitor:
+                    with node3.account.monitor_log(self.processor3.LOG_FILE) 
as log_monitor:
+                        self.processor3.start()
+                        log_monitor.wait_until("Kafka version : " + version,
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka 
Streams version " + version + " " + str(node3.account))
+                        first_monitor.wait_until("processed 100 records from 
topic",
+                                                 timeout_sec=60,
+                                                 err_msg="Never saw output 
'processed 100 records from topic' on" + str(node1.account))
+                        second_monitor.wait_until("processed 100 records from 
topic",
+                                                  timeout_sec=60,
+                                                  err_msg="Never saw output 
'processed 100 records from topic' on" + str(node2.account))
+                        third_monitor.wait_until("processed 100 records from 
topic",
+                                                  timeout_sec=60,
+                                                  err_msg="Never saw output 
'processed 100 records from topic' on" + str(node3.account))
+
+    @staticmethod
+    def prepare_for(processor, version):
+        processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, 
allow_fail=False)
+        processor.set_version(version)
+
+    def do_rolling_bounce(self, processor, upgrade_from, new_version, counter):
+        first_other_processor = None
+        second_other_processor = None
+        for p in self.processors:
+            if p != processor:
+                if first_other_processor is None:
+                    first_other_processor = p
+                else:
+                    second_other_processor = p
+
+        node = processor.node
+        first_other_node = first_other_processor.node
+        second_other_node = second_other_processor.node
+
+        # stop processor and wait for rebalance of others
+        with 
first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as 
first_other_monitor:
+            with 
second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as 
second_other_monitor:
+                processor.stop()
+                first_other_monitor.wait_until("processed 100 records from 
topic",
+                                               timeout_sec=60,
+                                               err_msg="Never saw output 
'processed 100 records from topic' on" + str(first_other_node.account))
+                second_other_monitor.wait_until("processed 100 records from 
topic",
+                                                timeout_sec=60,
+                                                err_msg="Never saw output 
'processed 100 records from topic' on" + str(second_other_node.account))
+        node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % 
processor.STDOUT_FILE, allow_fail=False)
+
+        if upgrade_from == "":  # upgrade disabled -- second round of rolling 
bounces
+            roll_counter = ".1-"  # second round of rolling bounces
+        else:
+            roll_counter = ".0-"  # first  round of rolling boundes
+
+        node.account.ssh("mv " + processor.STDOUT_FILE + " " + 
processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
+        node.account.ssh("mv " + processor.STDERR_FILE + " " + 
processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)
+        node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE 
+ roll_counter + str(counter), allow_fail=False)
+
+        if new_version == str(DEV_VERSION):
+            processor.set_version("")  # set to TRUNK
+        else:
+            processor.set_version(new_version)
+        processor.set_upgrade_from(upgrade_from)
+
+        grep_metadata_error = "grep 
\"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode 
subscription data: version=2\" "
+        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+                with 
first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as 
first_other_monitor:
+                    with 
second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as 
second_other_monitor:
+                        processor.start()
+
+                        log_monitor.wait_until("Kafka version : " + 
new_version,
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka 
Streams version " + new_version + " " + str(node.account))
+                        first_other_monitor.wait_until("processed 100 records 
from topic",
+                                                       timeout_sec=60,
+                                                       err_msg="Never saw 
output 'processed 100 records from topic' on" + str(first_other_node.account))
+                        found = 
list(first_other_node.account.ssh_capture(grep_metadata_error + 
first_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable 
to decode subscription data: version=2'")
+
+                        second_other_monitor.wait_until("processed 100 records 
from topic",
+                                                        timeout_sec=60,
+                                                        err_msg="Never saw 
output 'processed 100 records from topic' on" + str(second_other_node.account))
+                        found = 
list(second_other_node.account.ssh_capture(grep_metadata_error + 
second_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable 
to decode subscription data: version=2'")
+
+                        monitor.wait_until("processed 100 records from topic",
+                                           timeout_sec=60,
+                                           err_msg="Never saw output 
'processed 100 records from topic' on" + str(node.account))
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index f63a7c17ecd..94ba100bda7 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -61,6 +61,7 @@ def get_version(node=None):
         return DEV_BRANCH
 
 DEV_BRANCH = KafkaVersion("dev")
+DEV_VERSION = KafkaVersion("0.11.0.3-SNAPSHOT")
 
 # 0.8.2.X versions
 V_0_8_2_1 = KafkaVersion("0.8.2.1")
@@ -91,5 +92,7 @@ def get_version(node=None):
 
 # 0.11.0.0 versions
 V_0_11_0_0 = KafkaVersion("0.11.0.0")
-LATEST_0_11_0 = V_0_11_0_0
+V_0_11_0_1 = KafkaVersion("0.11.0.1")
+V_0_11_0_2 = KafkaVersion("0.11.0.2")
+LATEST_0_11_0 = V_0_11_0_2
 LATEST_0_11 = LATEST_0_11_0
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 4c0add543aa..28b81ed05a8 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -64,6 +64,8 @@ get_kafka() {
 
     kafka_dir=/opt/kafka-$version
     
url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_$scala_version-$version.tgz
+    # the .tgz above does not include the streams test jar hence we need to 
get it separately
+    
url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar
     if [ ! -d /opt/kafka-$version ]; then
         pushd /tmp
         curl -O $url


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6054
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6054
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: James Cheng
>            Assignee: Matthias J. Sax
>            Priority: Major
>              Labels: kip
>             Fix For: 1.2.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running.
> We observed the following stack trace:
> {code:java}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
>         at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
>         at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>         
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, 
> because the internal version number of the protocol changed when adding 
> Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to