[ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16549953#comment-16549953
 ] 

ASF GitHub Bot commented on KAFKA-5037:
---------------------------------------

guozhangwang closed pull request #5322: KAFKA-5037 Infinite loop if all input 
topics are unknown at startup
URL: https://github.com/apache/kafka/pull/5322
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index cef8116e880..8ed80dc524e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -136,7 +136,7 @@
     private final String clientId;
     private final Metrics metrics;
     private final StreamsConfig config;
-    private final StreamThread[] threads;
+    protected final StreamThread[] threads;
     private final StateDirectory stateDirectory;
     private final StreamsMetadataState streamsMetadataState;
     private final ScheduledExecutorService stateDirCleaner;
@@ -209,7 +209,7 @@ public boolean isValidTransition(final State newState) {
     }
 
     private final Object stateLock = new Object();
-    private volatile State state = State.CREATED;
+    protected volatile State state = State.CREATED;
 
     private boolean waitOnState(final State targetState, final long waitMs) {
         long begin = time.milliseconds();
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index eee5bc630b6..d1f7d93786f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -727,7 +727,7 @@
 
     public static class InternalConfig {
         public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = 
"__task.manager.instance__";
-        public static final String VERSION_PROBING_FLAG = 
"__version.probing.flag__";
+        public static final String ASSIGNMENT_ERROR_CODE = 
"__assignment.error.code__";
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index cee94886854..712f8a75514 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -20,7 +20,6 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,18 +79,14 @@ protected int maxNumPartitions(Cluster metadata, 
Set<String> topics) {
         int maxNumPartitions = 0;
         for (String topic : topics) {
             List<PartitionInfo> partitions = 
metadata.partitionsForTopic(topic);
-
             if (partitions.isEmpty()) {
+                log.error("Empty partitions for topic {}", topic);
+                throw new RuntimeException("Empty partitions for topic " + 
topic);
+            }
 
-                log.warn("Skipping creating tasks for the topic group {} since 
topic {}'s metadata is not available yet;"
-                         + " no tasks for this topic group will be assigned to 
any client.\n"
-                         + " Make sure all supplied topics in the topology are 
created before starting"
-                         + " as this could lead to unexpected results", 
topics, topic);
-                return StreamsPartitionAssignor.NOT_AVAILABLE;
-            } else {
-                int numPartitions = partitions.size();
-                if (numPartitions > maxNumPartitions)
-                    maxNumPartitions = numPartitions;
+            int numPartitions = partitions.size();
+            if (numPartitions > maxNumPartitions) {
+                maxNumPartitions = numPartitions;
             }
         }
         return maxNumPartitions;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f425bb4e617..31de839b1a1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -63,7 +63,6 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
@@ -261,12 +260,18 @@ public void onPartitionsAssigned(final 
Collection<TopicPartition> assignment) {
                 taskManager.suspendedActiveTaskIds(),
                 taskManager.suspendedStandbyTaskIds());
 
+            if (streamThread.assignmentErrorCode.get() == 
StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
+                log.debug("Received error code {} - shutdown", 
streamThread.assignmentErrorCode.get());
+                streamThread.shutdown();
+                streamThread.setStateListener(null);
+                return;
+            }
             final long start = time.milliseconds();
             try {
                 if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
                     return;
                 }
-                if (!streamThread.versionProbingFlag.get()) {
+                if (streamThread.assignmentErrorCode.get() == 
StreamsPartitionAssignor.Error.NONE.code()) {
                     taskManager.createTasks(assignment);
                 }
             } catch (final Throwable t) {
@@ -302,8 +307,8 @@ public void onPartitionsRevoked(final 
Collection<TopicPartition> assignment) {
                 final long start = time.milliseconds();
                 try {
                     // suspend active tasks
-                    if (streamThread.versionProbingFlag.get()) {
-                        streamThread.versionProbingFlag.set(false);
+                    if (streamThread.assignmentErrorCode.get() == 
StreamsPartitionAssignor.Error.VERSION_PROBING.code()) {
+                        
streamThread.assignmentErrorCode.set(StreamsPartitionAssignor.Error.NONE.code());
                     } else {
                         taskManager.suspendTasksAndState();
                     }
@@ -563,7 +568,7 @@ StandbyTask createTask(final Consumer<byte[], byte[]> 
consumer,
     private final String logPrefix;
     private final TaskManager taskManager;
     private final StreamsMetricsThreadImpl streamsMetrics;
-    private final AtomicBoolean versionProbingFlag;
+    private final AtomicInteger assignmentErrorCode;
 
     private long lastCommitMs;
     private long timerStartedMs;
@@ -657,8 +662,8 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
         final String applicationId = 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         final Map<String, Object> consumerConfigs = 
config.getMainConsumerConfigs(applicationId, threadClientId);
         
consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR,
 taskManager);
-        final AtomicBoolean versionProbingFlag = new AtomicBoolean();
-        consumerConfigs.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, 
versionProbingFlag);
+        final AtomicInteger assignmentErrorCode = new AtomicInteger();
+        
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, 
assignmentErrorCode);
         String originalReset = null;
         if (!builder.latestResetTopicsPattern().pattern().equals("") || 
!builder.earliestResetTopicsPattern().pattern().equals("")) {
             originalReset = (String) 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
@@ -679,7 +684,7 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
             builder,
             threadClientId,
             logContext,
-            versionProbingFlag);
+            assignmentErrorCode);
     }
 
     public StreamThread(final Time time,
@@ -693,7 +698,7 @@ public StreamThread(final Time time,
                         final InternalTopologyBuilder builder,
                         final String threadClientId,
                         final LogContext logContext,
-                        final AtomicBoolean versionProbingFlag) {
+                        final AtomicInteger assignmentErrorCode) {
         super(threadClientId);
 
         this.stateLock = new Object();
@@ -710,7 +715,7 @@ public StreamThread(final Time time,
         this.restoreConsumer = restoreConsumer;
         this.consumer = consumer;
         this.originalReset = originalReset;
-        this.versionProbingFlag = versionProbingFlag;
+        this.assignmentErrorCode = assignmentErrorCode;
 
         this.pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
         this.commitTimeMs = 
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
@@ -765,7 +770,7 @@ private void runLoop() {
         while (isRunning()) {
             try {
                 recordsProcessedBeforeCommit = 
runOnce(recordsProcessedBeforeCommit);
-                if (versionProbingFlag.get()) {
+                if (assignmentErrorCode.get() == 
StreamsPartitionAssignor.Error.VERSION_PROBING.code()) {
                     log.info("Version probing detected. Triggering new 
rebalance.");
                     enforceRebalance();
                 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index db94ac0c852..d81d4f1511f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -51,7 +51,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
@@ -59,16 +59,44 @@
 public class StreamsPartitionAssignor implements PartitionAssignor, 
Configurable {
 
     private final static int UNKNOWN = -1;
-    public final static int NOT_AVAILABLE = -2;
     private final static int VERSION_ONE = 1;
     private final static int VERSION_TWO = 2;
     private final static int VERSION_THREE = 3;
+    private final static int VERSION_FOUR = 4;
     private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE;
     private int minReceivedMetadataVersion = UNKNOWN;
     protected Set<Integer> supportedVersions = new HashSet<>();
 
     private Logger log;
     private String logPrefix;
+    public enum Error {
+        NONE(0),
+        INCOMPLETE_SOURCE_TOPIC_METADATA(1),
+        VERSION_PROBING(2);
+
+        private final int code;
+
+        Error(final int code) {
+            this.code = code;
+        }
+
+        public int code() {
+            return code;
+        }
+
+        public static Error fromCode(final int code) {
+            switch (code) {
+                case 0:
+                    return NONE;
+                case 1:
+                    return INCOMPLETE_SOURCE_TOPIC_METADATA;
+                case 2:
+                    return VERSION_PROBING;
+                default:
+                    throw new IllegalArgumentException("Unknown error code: " 
+ code);
+            }
+        }
+    }
 
     private static class AssignedPartition implements 
Comparable<AssignedPartition> {
         public final TaskId taskId;
@@ -185,7 +213,7 @@ public int compare(final TopicPartition p1,
 
     private TaskManager taskManager;
     private PartitionGrouper partitionGrouper;
-    private AtomicBoolean versionProbingFlag;
+    private AtomicInteger assignmentErrorCode;
 
     protected int usedSubscriptionMetadataVersion = 
SubscriptionInfo.LATEST_SUPPORTED_VERSION;
 
@@ -250,20 +278,20 @@ public void configure(final Map<String, ?> configs) {
 
         taskManager = (TaskManager) o;
 
-        final Object o2 = 
configs.get(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG);
-        if (o2 == null) {
-            final KafkaException fatalException = new 
KafkaException("VersionProbingFlag is not specified");
+        final Object ai = 
configs.get(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE);
+        if (ai == null) {
+            final KafkaException fatalException = new 
KafkaException("assignmentErrorCode is not specified");
             log.error(fatalException.getMessage(), fatalException);
             throw fatalException;
         }
 
-        if (!(o2 instanceof AtomicBoolean)) {
-            final KafkaException fatalException = new 
KafkaException(String.format("%s is not an instance of %s", 
o2.getClass().getName(), AtomicBoolean.class.getName()));
+        if (!(ai instanceof AtomicInteger)) {
+            final KafkaException fatalException = new 
KafkaException(String.format("%s is not an instance of %s",
+                ai.getClass().getName(), AtomicInteger.class.getName()));
             log.error(fatalException.getMessage(), fatalException);
             throw fatalException;
         }
-
-        versionProbingFlag = (AtomicBoolean) o2;
+        assignmentErrorCode = (AtomicInteger) ai;
 
         numStandbyReplicas = 
streamsConfig.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
@@ -319,6 +347,26 @@ public Subscription subscription(final Set<String> topics) 
{
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
 
+    Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> 
clientsMetadata,
+                                            final String topic,
+                                            final int errorCode) {
+        log.error("{} is unknown yet during rebalance," +
+            " please make sure they have been pre-created before starting the 
Streams application.", topic);
+        final Map<String, Assignment> assignment = new HashMap<>();
+        for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
+            for (final String consumerId : clientMetadata.consumers) {
+                assignment.put(consumerId, new Assignment(
+                    Collections.emptyList(),
+                    new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION,
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        errorCode).encode()
+                ));
+            }
+        }
+        return assignment;
+    }
     /*
      * This assigns tasks to consumer clients in the following steps.
      *
@@ -409,6 +457,12 @@ public Subscription subscription(final Set<String> topics) 
{
 
         final Map<String, InternalTopicMetadata> repartitionTopicMetadata = 
new HashMap<>();
         for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
+            for (final String topic : topicsInfo.sourceTopics) {
+                if 
(!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
+                    !metadata.topics().contains(topic)) {
+                    return errorAssignment(clientsMetadata, topic, 
Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code);
+                }
+            }
             for (final InternalTopicConfig topic: 
topicsInfo.repartitionSourceTopics.values()) {
                 repartitionTopicMetadata.put(topic.name(), new 
InternalTopicMetadata(topic));
             }
@@ -438,12 +492,9 @@ public Subscription subscription(final Set<String> topics) 
{
                                         numPartitionsCandidate = 
repartitionTopicMetadata.get(sourceTopicName).numPartitions;
                                     } else {
                                         numPartitionsCandidate = 
metadata.partitionCountForTopic(sourceTopicName);
-                                        if (numPartitionsCandidate == null) {
-                                            
repartitionTopicMetadata.get(topicName).numPartitions = NOT_AVAILABLE;
-                                        }
                                     }
 
-                                    if (numPartitionsCandidate != null && 
numPartitionsCandidate > numPartitions) {
+                                    if (numPartitionsCandidate > 
numPartitions) {
                                         numPartitions = numPartitionsCandidate;
                                     }
                                 }
@@ -582,7 +633,7 @@ public Subscription subscription(final Set<String> topics) {
 
         // construct the global partition assignment per host map
         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new 
HashMap<>();
-        if (minReceivedMetadataVersion == 2 || minReceivedMetadataVersion == 
3) {
+        if (minReceivedMetadataVersion >= 2) {
             for (final Map.Entry<UUID, ClientMetadata> entry : 
clientsMetadata.entrySet()) {
                 final HostInfo hostInfo = entry.getValue().hostInfo;
 
@@ -658,7 +709,7 @@ public Subscription subscription(final Set<String> topics) {
                 // finally, encode the assignment before sending back to 
coordinator
                 assignment.put(consumer, new Assignment(
                     activePartitions,
-                    new AssignmentInfo(minUserMetadataVersion, active, 
standby, partitionsByHostState).encode()));
+                    new AssignmentInfo(minUserMetadataVersion, active, 
standby, partitionsByHostState, 0).encode()));
             }
         }
 
@@ -698,7 +749,8 @@ public Subscription subscription(final Set<String> topics) {
                         minUserMetadataVersion,
                         activeTasks,
                         standbyTasks,
-                        partitionsByHostState)
+                        partitionsByHostState,
+                        0)
                         .encode()
                 ));
             }
@@ -744,6 +796,11 @@ public void onAssignment(final Assignment assignment) {
         Collections.sort(partitions, PARTITION_COMPARATOR);
 
         final AssignmentInfo info = 
AssignmentInfo.decode(assignment.userData());
+        if (info.errCode() != Error.NONE.code) {
+            // set flag to shutdown streams app
+            assignmentErrorCode.set(info.errCode());
+            return;
+        }
         final int receivedAssignmentMetadataVersion = info.version();
         final int leaderSupportedVersion = info.latestSupportedVersion();
 
@@ -770,7 +827,7 @@ public void onAssignment(final Assignment assignment) {
                 usedSubscriptionMetadataVersion = leaderSupportedVersion;
             }
 
-            versionProbingFlag.set(true);
+            assignmentErrorCode.set(Error.VERSION_PROBING.code);
             return;
         }
 
@@ -801,6 +858,18 @@ public void onAssignment(final Assignment assignment) {
                 processVersionThreeAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
                 partitionsByHost = info.partitionsByHost();
                 break;
+            case VERSION_FOUR:
+                if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
+                    log.info("Sent a version {} subscription and group 
leader's latest supported version is {}. " +
+                        "Upgrading subscription metadata version to {} for 
next rebalance.",
+                        usedSubscriptionMetadataVersion,
+                        leaderSupportedVersion,
+                        leaderSupportedVersion);
+                    usedSubscriptionMetadataVersion = leaderSupportedVersion;
+                }
+                processVersionFourAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
+                partitionsByHost = info.partitionsByHost();
+                break;
             default:
                 throw new IllegalStateException("This code should never be 
reached. Please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA/";);
         }
@@ -854,6 +923,13 @@ private void processVersionThreeAssignment(final 
AssignmentInfo info,
         processVersionTwoAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
     }
 
+    private void processVersionFourAssignment(final AssignmentInfo info,
+                                              final List<TopicPartition> 
partitions,
+                                              final Map<TaskId, 
Set<TopicPartition>> activeTasks,
+                                              final Map<TopicPartition, 
PartitionInfo> topicToPartitionInfo) {
+        processVersionThreeAssignment(info, partitions, activeTasks, 
topicToPartitionInfo);
+    }
+
     // for testing
     protected void processLatestVersionAssignment(final AssignmentInfo info,
                                                   final List<TopicPartition> 
partitions,
@@ -877,9 +953,6 @@ private void prepareTopic(final Map<String, 
InternalTopicMetadata> topicPartitio
             final InternalTopicConfig topic = metadata.config;
             final int numPartitions = metadata.numPartitions;
 
-            if (numPartitions == NOT_AVAILABLE) {
-                continue;
-            }
             if (numPartitions < 0) {
                 throw new StreamsException(String.format("%sTopic [%s] number 
of partitions not defined", logPrefix, topic.name()));
             }
@@ -905,9 +978,12 @@ private void ensureCopartitioning(final 
Collection<Set<String>> copartitionGroup
 
     static class CopartitionedTopicsValidator {
         private final String logPrefix;
+        private Logger log;
 
         CopartitionedTopicsValidator(final String logPrefix) {
             this.logPrefix = logPrefix;
+            final LogContext logContext = new LogContext(logPrefix);
+            log = logContext.logger(getClass());
         }
 
         void validate(final Set<String> copartitionGroup,
@@ -918,9 +994,10 @@ void validate(final Set<String> copartitionGroup,
             for (final String topic : copartitionGroup) {
                 if (!allRepartitionTopicsNumPartitions.containsKey(topic)) {
                     final Integer partitions = 
metadata.partitionCountForTopic(topic);
-
                     if (partitions == null) {
-                        throw new 
org.apache.kafka.streams.errors.TopologyException(String.format("%sTopic not 
found: %s", logPrefix, topic));
+                        String str = String.format("%sTopic not found: %s", 
logPrefix, topic);
+                        log.error(str);
+                        throw new IllegalStateException(str);
                     }
 
                     if (numPartitions == UNKNOWN) {
@@ -930,9 +1007,6 @@ void validate(final Set<String> copartitionGroup,
                         Arrays.sort(topics);
                         throw new 
org.apache.kafka.streams.errors.TopologyException(String.format("%sTopics not 
co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
                     }
-                } else if 
(allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
-                    numPartitions = NOT_AVAILABLE;
-                    break;
                 }
             }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index c577830e3e2..1179ca04097 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -41,11 +41,12 @@
 
     private static final Logger log = 
LoggerFactory.getLogger(AssignmentInfo.class);
 
-    public static final int LATEST_SUPPORTED_VERSION = 3;
+    public static final int LATEST_SUPPORTED_VERSION = 4;
     static final int UNKNOWN = -1;
 
     private final int usedVersion;
     private final int latestSupportedVersion;
+    private int errCode;
     private List<TaskId> activeTasks;
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
     private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
@@ -55,27 +56,29 @@ private AssignmentInfo(final int version,
                            final int latestSupportedVersion) {
         this.usedVersion = version;
         this.latestSupportedVersion = latestSupportedVersion;
+        this.errCode = 0;
     }
 
     public AssignmentInfo(final List<TaskId> activeTasks,
                           final Map<TaskId, Set<TopicPartition>> standbyTasks,
                           final Map<HostInfo, Set<TopicPartition>> hostState) {
-        this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
+        this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, 
0);
     }
 
     public AssignmentInfo() {
         this(LATEST_SUPPORTED_VERSION,
             Collections.emptyList(),
             Collections.emptyMap(),
-            Collections.emptyMap());
+            Collections.emptyMap(),
+            0);
     }
 
     public AssignmentInfo(final int version,
                           final List<TaskId> activeTasks,
                           final Map<TaskId, Set<TopicPartition>> standbyTasks,
-                          final Map<HostInfo, Set<TopicPartition>> hostState) {
-        this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, 
hostState);
-
+                          final Map<HostInfo, Set<TopicPartition>> hostState,
+                          final int errCode) {
+        this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, 
hostState, errCode);
         if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
             throw new IllegalArgumentException("version must be between 1 and 
" + LATEST_SUPPORTED_VERSION
                 + "; was: " + version);
@@ -87,18 +90,24 @@ public AssignmentInfo(final int version,
                    final int latestSupportedVersion,
                    final List<TaskId> activeTasks,
                    final Map<TaskId, Set<TopicPartition>> standbyTasks,
-                   final Map<HostInfo, Set<TopicPartition>> hostState) {
+                   final Map<HostInfo, Set<TopicPartition>> hostState,
+                   final int errCode) {
         this.usedVersion = version;
         this.latestSupportedVersion = latestSupportedVersion;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
         this.partitionsByHost = hostState;
+        this.errCode = errCode;
     }
 
     public int version() {
         return usedVersion;
     }
 
+    public int errCode() {
+        return errCode;
+    }
+
     public int latestSupportedVersion() {
         return latestSupportedVersion;
     }
@@ -133,6 +142,9 @@ public ByteBuffer encode() {
                 case 3:
                     encodeVersionThree(out);
                     break;
+                case 4:
+                    encodeVersionFour(out);
+                    break;
                 default:
                     throw new IllegalStateException("Unknown metadata version: 
" + usedVersion
                         + "; latest supported version: " + 
LATEST_SUPPORTED_VERSION);
@@ -203,6 +215,14 @@ private void encodeVersionThree(final DataOutputStream 
out) throws IOException {
         encodePartitionsByHost(out);
     }
 
+    private void encodeVersionFour(final DataOutputStream out) throws 
IOException {
+        out.writeInt(4);
+        out.writeInt(LATEST_SUPPORTED_VERSION);
+        encodeActiveAndStandbyTaskAssignment(out);
+        encodePartitionsByHost(out);
+        out.writeInt(errCode);
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to decode the data or 
if the data version is unknown
      */
@@ -214,6 +234,7 @@ public static AssignmentInfo decode(final ByteBuffer data) {
             final AssignmentInfo assignmentInfo;
 
             final int usedVersion = in.readInt();
+            int latestSupportedVersion;
             switch (usedVersion) {
                 case 1:
                     assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
@@ -224,10 +245,15 @@ public static AssignmentInfo decode(final ByteBuffer 
data) {
                     decodeVersionTwoData(assignmentInfo, in);
                     break;
                 case 3:
-                    final int latestSupportedVersion = in.readInt();
+                    latestSupportedVersion = in.readInt();
                     assignmentInfo = new AssignmentInfo(usedVersion, 
latestSupportedVersion);
                     decodeVersionThreeData(assignmentInfo, in);
                     break;
+                case 4:
+                    latestSupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, 
latestSupportedVersion);
+                    decodeVersionFourData(assignmentInfo, in);
+                    break;
                 default:
                     final TaskAssignmentException fatalException = new 
TaskAssignmentException("Unable to decode assignment data: " +
                         "used version: " + usedVersion + "; latest supported 
version: " + LATEST_SUPPORTED_VERSION);
@@ -300,9 +326,16 @@ private static void decodeVersionThreeData(final 
AssignmentInfo assignmentInfo,
         decodeGlobalAssignmentData(assignmentInfo, in);
     }
 
+    private static void decodeVersionFourData(final AssignmentInfo 
assignmentInfo,
+                                              final DataInputStream in) throws 
IOException {
+        decodeVersionThreeData(assignmentInfo, in);
+        assignmentInfo.errCode = in.readInt();
+    }
+
     @Override
     public int hashCode() {
-        return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ 
standbyTasks.hashCode() ^ partitionsByHost.hashCode();
+        return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ 
standbyTasks.hashCode()
+            ^ partitionsByHost.hashCode() ^ errCode;
     }
 
     @Override
@@ -311,6 +344,7 @@ public boolean equals(final Object o) {
             final AssignmentInfo other = (AssignmentInfo) o;
             return usedVersion == other.usedVersion &&
                     latestSupportedVersion == other.latestSupportedVersion &&
+                    errCode == other.errCode &&
                     activeTasks.equals(other.activeTasks) &&
                     standbyTasks.equals(other.standbyTasks) &&
                     partitionsByHost.equals(other.partitionsByHost);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 4ebc95674b0..b4ad19f0204 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -32,7 +32,7 @@
 
     private static final Logger log = 
LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    public static final int LATEST_SUPPORTED_VERSION = 3;
+    public static final int LATEST_SUPPORTED_VERSION = 4;
     static final int UNKNOWN = -1;
 
     private final int usedVersion;
@@ -124,6 +124,9 @@ public ByteBuffer encode() {
             case 3:
                 buf = encodeVersionThree();
                 break;
+            case 4:
+                buf = encodeVersionFour();
+                break;
             default:
                 throw new IllegalStateException("Unknown metadata version: " + 
usedVersion
                     + "; latest supported version: " + 
LATEST_SUPPORTED_VERSION);
@@ -205,7 +208,7 @@ protected void encodeUserEndPoint(final ByteBuffer buf,
     private ByteBuffer encodeVersionThree() {
         final byte[] endPointBytes = prepareUserEndPoint();
 
-        final ByteBuffer buf = 
ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes));
+        final ByteBuffer buf = 
ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
 
         buf.putInt(3); // used version
         buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
@@ -217,7 +220,22 @@ private ByteBuffer encodeVersionThree() {
         return buf;
     }
 
-    protected int getVersionThreeByteLength(final byte[] endPointBytes) {
+    private ByteBuffer encodeVersionFour() {
+        final byte[] endPointBytes = prepareUserEndPoint();
+
+        final ByteBuffer buf = 
ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
+
+        buf.putInt(4); // used version
+        buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
+        encodeClientUUID(buf);
+        encodeTasks(buf, prevTasks);
+        encodeTasks(buf, standbyTasks);
+        encodeUserEndPoint(buf, endPointBytes);
+
+        return buf;
+    }
+
+    protected int getVersionThreeAndFourByteLength(final byte[] endPointBytes) 
{
         return 4 + // used version
                4 + // latest supported version version
                16 + // client ID
@@ -247,6 +265,7 @@ public static SubscriptionInfo decode(final ByteBuffer 
data) {
                 decodeVersionTwoData(subscriptionInfo, data);
                 break;
             case 3:
+            case 4:
                 latestSupportedVersion = data.getInt();
                 subscriptionInfo = new SubscriptionInfo(usedVersion, 
latestSupportedVersion);
                 decodeVersionThreeData(subscriptionInfo, data);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java
new file mode 100644
index 00000000000..9dd1fc1dcee
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Properties;
+
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+/**
+ *  This class allows to access the {@link KafkaStreams} a {@link 
StreamThread.StateListener} object.
+ *
+ */
+public class KafkaStreamsWrapper extends KafkaStreams {
+
+    public KafkaStreamsWrapper(final Topology topology,
+                               final Properties props) {
+        super(topology, props);
+    }
+
+    /**
+     * An app can set a single {@link StreamThread.StateListener} so that the 
app is notified when state changes.
+     *
+     * @param listener a new StreamThread state listener
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+     */
+    public void setStreamThreadStateListener(final StreamThread.StateListener 
listener) {
+        if (state == State.CREATED) {
+            for (final StreamThread thread : threads) {
+                thread.setStateListener(listener);
+            }
+        } else {
+            throw new IllegalStateException("Can only set StateListener in 
CREATED state. " +
+                "Current state is: " + state);
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 2ab6639ce05..a0b8f3d5aca 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -37,6 +37,8 @@
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import 
org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import scala.Option;
@@ -61,6 +63,32 @@
     public static final long DEFAULT_TIMEOUT = 30 * 1000L;
     public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = 
"internal.leave.group.on.close";
 
+    /*
+     * Records state transition for StreamThread
+     */
+    public static class StateListenerStub implements 
StreamThread.StateListener {
+        boolean runningToRevokedSeen = false;
+        boolean revokedToPendingShutdownSeen = false;
+        @Override
+        public void onChange(final Thread thread,
+                             final ThreadStateTransitionValidator newState,
+                             final ThreadStateTransitionValidator oldState) {
+            if (oldState == StreamThread.State.RUNNING && newState == 
StreamThread.State.PARTITIONS_REVOKED) {
+                runningToRevokedSeen = true;
+            } else if (oldState == StreamThread.State.PARTITIONS_REVOKED && 
newState == StreamThread.State.PENDING_SHUTDOWN) {
+                revokedToPendingShutdownSeen = true;
+            }
+        }
+
+        public boolean revokedToPendingShutdownSeen() {
+            return revokedToPendingShutdownSeen;
+        }
+
+        public boolean runningToRevokedSeen() {
+            return runningToRevokedSeen;
+        }
+    }
+
     /**
      * Removes local state stores.  Useful to reset state in-between 
integration test runs.
      *
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index de197fe3573..b1c36843db6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -91,16 +91,14 @@ public void 
shouldComputeGroupingForSingleGroupWithMultipleTopics() {
         assertEquals(expectedPartitionsForTask, 
grouper.partitionGroups(topicGroups, metadata));
     }
 
-    @Test
+    @Test(expected = RuntimeException.class)
     public void shouldNotCreateAnyTasksBecauseOneTopicHasUnknownPartitions() {
         final PartitionGrouper grouper = new DefaultPartitionGrouper();
-        final Map<TaskId, Set<TopicPartition>> expectedPartitionsForTask = new 
HashMap<>();
         final Map<Integer, Set<String>> topicGroups = new HashMap<>();
-
+    
         final int topicGroupId = 0;
-
+    
         topicGroups.put(topicGroupId, mkSet("topic1", "unknownTopic", 
"topic2"));
-
-        assertEquals(expectedPartitionsForTask, 
grouper.partitionGroups(topicGroups, metadata));
+        grouper.partitionGroups(topicGroups, metadata);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index b8221d8642a..a71f4883c64 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -46,7 +46,7 @@ public void before() {
         partitions.put(new TopicPartition("second", 1), new 
PartitionInfo("second", 1, null, null, null));
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = IllegalStateException.class)
     public void 
shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() {
         validator.validate(Collections.singleton("topic"),
                            Collections.<String, 
StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
@@ -98,27 +98,6 @@ public void 
shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartition
         assertThat(three.numPartitions, equalTo(15));
     }
 
-    @Test
-    public void 
shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable() {
-        final StreamsPartitionAssignor.InternalTopicMetadata one = 
createTopicMetadata("one", 1);
-        final StreamsPartitionAssignor.InternalTopicMetadata two = 
createTopicMetadata("two", StreamsPartitionAssignor.NOT_AVAILABLE);
-        final Map<String, StreamsPartitionAssignor.InternalTopicMetadata> 
repartitionTopicConfig = new HashMap<>();
-
-        repartitionTopicConfig.put(one.config.name(), one);
-        repartitionTopicConfig.put(two.config.name(), two);
-
-        validator.validate(Utils.mkSet("first",
-                                       "second",
-                                       one.config.name(),
-                                       two.config.name()),
-                           repartitionTopicConfig,
-                           cluster.withPartitions(partitions));
-
-        assertThat(one.numPartitions, 
equalTo(StreamsPartitionAssignor.NOT_AVAILABLE));
-        assertThat(two.numPartitions, 
equalTo(StreamsPartitionAssignor.NOT_AVAILABLE));
-
-    }
-
     private StreamsPartitionAssignor.InternalTopicMetadata 
createTopicMetadata(final String repartitionTopic,
                                                                                
final int partitions) {
         final InternalTopicConfig repartitionTopicConfig
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2ccc89348fb..93d4e94a234 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -82,7 +82,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -305,7 +305,7 @@ public void shouldNotCommitBeforeTheCommitInterval() {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval - 10L);
@@ -339,7 +339,7 @@ public void shouldNotCauseExceptionIfNothingCommitted() {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval - 10L);
@@ -374,7 +374,7 @@ public void shouldCommitAfterTheCommitInterval() {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval + 1);
@@ -523,7 +523,7 @@ public void shouldShutdownTaskManagerOnClose() {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.setStateListener(
             new StreamThread.StateListener() {
@@ -560,7 +560,7 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.shutdown();
         EasyMock.verify(taskManager);
@@ -588,7 +588,7 @@ public void shouldOnlyShutdownOnce() {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.shutdown();
         // Execute the run method. Verification of the mock will check that 
shutdown was only done once
@@ -1288,7 +1288,8 @@ public void producerMetricsVerificationWithoutEOS() {
                 internalTopologyBuilder,
                 clientId,
                 new LogContext(""),
-                new AtomicBoolean());
+                new AtomicInteger()
+                );
         final MetricName testMetricName = new MetricName("test_metric", "", 
"", new HashMap<String, String>());
         final Metric testMetric = new KafkaMetric(
                 new Object(),
@@ -1331,7 +1332,8 @@ public void adminClientMetricsVerification() {
                 internalTopologyBuilder,
                 clientId,
                 new LogContext(""),
-                new AtomicBoolean());
+                new AtomicInteger()
+                );
         final MetricName testMetricName = new MetricName("test_metric", "", 
"", new HashMap<String, String>());
         final Metric testMetric = new KafkaMetric(
                 new Object(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 4327e8f1ee4..2577bb84abf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -58,7 +58,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
@@ -122,7 +122,7 @@
         configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
         configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
userEndPoint);
         
configurationMap.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR,
 taskManager);
-        
configurationMap.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, new 
AtomicBoolean());
+        
configurationMap.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, new 
AtomicInteger());
         return configurationMap;
     }
 
@@ -993,20 +993,9 @@ public Object apply(final Object value1, final Object 
value2) {
 
         final Map<String, PartitionAssignor.Assignment> assignment = 
partitionAssignor.assign(metadata, subscriptions);
 
-        final Map<String, Integer> expectedCreatedInternalTopics = new 
HashMap<>();
-        expectedCreatedInternalTopics.put(applicationId + 
"-count-repartition", 3);
-        expectedCreatedInternalTopics.put(applicationId + "-count-changelog", 
3);
-        assertThat(mockInternalTopicManager.readyTopics, 
equalTo(expectedCreatedInternalTopics));
+        assertThat(mockInternalTopicManager.readyTopics.isEmpty(), 
equalTo(true));
 
-        final List<TopicPartition> expectedAssignment = Arrays.asList(
-            new TopicPartition("topic1", 0),
-            new TopicPartition("topic1", 1),
-            new TopicPartition("topic1", 2),
-            new TopicPartition(applicationId + "-count-repartition", 0),
-            new TopicPartition(applicationId + "-count-repartition", 1),
-            new TopicPartition(applicationId + "-count-repartition", 2)
-        );
-        assertThat(new HashSet<>(assignment.get(client).partitions()), 
equalTo(new HashSet<>(expectedAssignment)));
+        assertThat(assignment.get(client).partitions().isEmpty(), 
equalTo(true));
     }
 
     @Test
@@ -1109,29 +1098,29 @@ public void 
shouldThrowKafkaExceptionIfTaskMangerConfigIsNotTaskManagerInstance(
     }
 
     @Test
-    public void shouldThrowKafkaExceptionVersionProbingFlagNotConfigured() {
+    public void shouldThrowKafkaExceptionAssignmentErrorCodeNotConfigured() {
         final Map<String, Object> config = configProps();
-        config.remove(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG);
+        config.remove(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE);
 
         try {
             partitionAssignor.configure(config);
             fail("Should have thrown KafkaException");
         } catch (final KafkaException expected) {
-            assertThat(expected.getMessage(), equalTo("VersionProbingFlag is 
not specified"));
+            assertThat(expected.getMessage(), equalTo("assignmentErrorCode is 
not specified"));
         }
     }
 
     @Test
-    public void 
shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicBoolean() {
+    public void 
shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicInteger() {
         final Map<String, Object> config = configProps();
-        config.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, "i am 
not an AtomicBoolean");
+        config.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, "i am 
not an AtomicInteger");
 
         try {
             partitionAssignor.configure(config);
             fail("Should have thrown KafkaException");
         } catch (final KafkaException expected) {
             assertThat(expected.getMessage(),
-                equalTo("java.lang.String is not an instance of 
java.util.concurrent.atomic.AtomicBoolean"));
+                equalTo("java.lang.String is not an instance of 
java.util.concurrent.atomic.AtomicInteger"));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index c7382e7671c..8b990659da1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -59,33 +59,39 @@ public void shouldUseLatestSupportedVersionByDefault() {
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowForUnknownVersion1() {
-        new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment);
+        new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment, 0);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowForUnknownVersion2() {
-        new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, 
activeTasks, standbyTasks, globalAssignment);
+        new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, 
activeTasks, standbyTasks, globalAssignment, 0);
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion1() {
-        final AssignmentInfo info = new AssignmentInfo(1, activeTasks, 
standbyTasks, globalAssignment);
-        final AssignmentInfo expectedInfo = new AssignmentInfo(1, 
AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.<HostInfo, 
Set<TopicPartition>>emptyMap());
+        final AssignmentInfo info = new AssignmentInfo(1, activeTasks, 
standbyTasks, globalAssignment, 0);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(1, 
AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.<HostInfo, 
Set<TopicPartition>>emptyMap(), 0);
         assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion2() {
-        final AssignmentInfo info = new AssignmentInfo(2, activeTasks, 
standbyTasks, globalAssignment);
-        final AssignmentInfo expectedInfo = new AssignmentInfo(2, 
AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment);
+        final AssignmentInfo info = new AssignmentInfo(2, activeTasks, 
standbyTasks, globalAssignment, 0);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(2, 
AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment, 0);
         assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion3() {
-        final AssignmentInfo info = new AssignmentInfo(3, activeTasks, 
standbyTasks, globalAssignment);
-        final AssignmentInfo expectedInfo = new AssignmentInfo(3, 
AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, 
globalAssignment);
+        final AssignmentInfo info = new AssignmentInfo(3, activeTasks, 
standbyTasks, globalAssignment, 0);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(3, 
AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, 
globalAssignment, 0);
         assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion4() {
+        final AssignmentInfo info = new AssignmentInfo(4, activeTasks, 
standbyTasks, globalAssignment, 2);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(4, 
AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, 
globalAssignment, 2);
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 0611bfc4d5c..2a75c57b237 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -76,6 +76,13 @@ public void shouldEncodeAndDecodeVersion3() {
         assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion4() {
+        final SubscriptionInfo info = new SubscriptionInfo(4, processId, 
activeTasks, standbyTasks, "localhost:80");
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(4, 
SubscriptionInfo.LATEST_SUPPORTED_VERSION, processId, activeTasks, 
standbyTasks, "localhost:80");
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
+    }
+
     @Test
     public void shouldAllowToDecodeFutureSupportedVersion() {
         final SubscriptionInfo info = 
SubscriptionInfo.decode(encodeFutureVersion());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 1b01a7300a1..33e9b9771c2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -274,7 +274,7 @@ public ByteBuffer encode() {
         private ByteBuffer encodeFutureVersion() {
             final byte[] endPointBytes = prepareUserEndPoint();
 
-            final ByteBuffer buf = 
ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes));
+            final ByteBuffer buf = 
ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
 
             buf.putInt(LATEST_SUPPORTED_VERSION + 1); // used version
             buf.putInt(LATEST_SUPPORTED_VERSION + 1); // supported version
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
new file mode 100644
index 00000000000..78ed591073a
--- /dev/null
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
@@ -0,0 +1,137 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util.Properties
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.streams._
+import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, 
IntegrationTestUtils}
+import org.apache.kafka.streams.processor.internals.StreamThread
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.kstream._
+import org.apache.kafka.test.TestUtils
+import org.junit.Assert._
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Test suite base that prepares Kafka cluster for stream-table joins in Kafka 
Streams
+ * <p>
+ */
+class StreamToTableJoinScalaIntegrationTestBase extends JUnitSuite with 
StreamToTableJoinTestData {
+
+  private val privateCluster: EmbeddedKafkaCluster = new 
EmbeddedKafkaCluster(1)
+
+  @Rule def cluster: EmbeddedKafkaCluster = privateCluster
+
+  final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
+  val mockTime: MockTime = cluster.time
+  mockTime.setCurrentTimeMs(alignedTime)
+
+  val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
+  @Rule def testFolder: TemporaryFolder = tFolder
+
+  @Before
+  def startKafkaCluster(): Unit = {
+    cluster.createTopic(userClicksTopic)
+    cluster.createTopic(userRegionsTopic)
+    cluster.createTopic(outputTopic)
+    cluster.createTopic(userClicksTopicJ)
+    cluster.createTopic(userRegionsTopicJ)
+    cluster.createTopic(outputTopicJ)
+  }
+
+  def getStreamsConfiguration(): Properties = {
+    val streamsConfiguration: Properties = new Properties()
+
+    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"stream-table-join-scala-integration-test")
+    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers())
+    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000")
+    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot.getPath)
+
+    streamsConfiguration
+  }
+
+  private def getUserRegionsProducerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ProducerConfig.ACKS_CONFIG, "all")
+    p.put(ProducerConfig.RETRIES_CONFIG, "0")
+    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    p
+  }
+
+  private def getUserClicksProducerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ProducerConfig.ACKS_CONFIG, "all")
+    p.put(ProducerConfig.RETRIES_CONFIG, "0")
+    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[LongSerializer])
+    p
+  }
+
+  private def getConsumerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ConsumerConfig.GROUP_ID_CONFIG, 
"join-scala-integration-test-standard-consumer")
+    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
+    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[LongDeserializer])
+    p
+  }
+
+  def produceNConsume(userClicksTopic: String,
+                              userRegionsTopic: String,
+                              outputTopic: String,
+                              waitTillRecordsReceived: Boolean = true): 
java.util.List[KeyValue[String, Long]] = {
+
+    import collection.JavaConverters._
+
+    // Publish user-region information.
+    val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
+    IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
+                                                       userRegions.asJava,
+                                                       
userRegionsProducerConfig,
+                                                       mockTime,
+                                                       false)
+
+    // Publish user-click information.
+    val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
+    IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
+                                                       userClicks.asJava,
+                                                       
userClicksProducerConfig,
+                                                       mockTime,
+                                                       false)
+
+    if (waitTillRecordsReceived) {
+      // consume and verify result
+      val consumerConfig = getConsumerConfig()
+
+          
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, 
outputTopic, expectedClicksPerRegion.size)
+    } else {
+      java.util.Collections.emptyList()
+    }
+  }
+}
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 7891131aa9e..e5253f95d45 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.streams._
 import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, 
IntegrationTestUtils}
+import org.apache.kafka.streams.processor.internals.StreamThread
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.test.TestUtils
@@ -41,28 +42,7 @@ import org.scalatest.junit.JUnitSuite
  * Note: In the current project settings SAM type conversion is turned off as 
it's experimental in Scala 2.11.
  * Hence the native Java API based version is more verbose.
  */
-class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite 
with StreamToTableJoinTestData {
-
-  private val privateCluster: EmbeddedKafkaCluster = new 
EmbeddedKafkaCluster(1)
-
-  @Rule def cluster: EmbeddedKafkaCluster = privateCluster
-
-  final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
-  val mockTime: MockTime = cluster.time
-  mockTime.setCurrentTimeMs(alignedTime)
-
-  val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
-  @Rule def testFolder: TemporaryFolder = tFolder
-
-  @Before
-  def startKafkaCluster(): Unit = {
-    cluster.createTopic(userClicksTopic)
-    cluster.createTopic(userRegionsTopic)
-    cluster.createTopic(outputTopic)
-    cluster.createTopic(userClicksTopicJ)
-    cluster.createTopic(userRegionsTopicJ)
-    cluster.createTopic(outputTopicJ)
-  }
+class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends 
StreamToTableJoinScalaIntegrationTestBase {
 
   @Test def testShouldCountClicksPerRegion(): Unit = {
 
@@ -101,7 +81,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends JUnitSuite wit
 
     val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
       produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
-
     streams.close()
 
     import collection.JavaConverters._
@@ -172,74 +151,4 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends JUnitSuite wit
     streams.close()
     assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), 
expectedClicksPerRegion.sortBy(_.key))
   }
-
-  private def getStreamsConfiguration(): Properties = {
-    val streamsConfiguration: Properties = new Properties()
-
-    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"stream-table-join-scala-integration-test")
-    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers())
-    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000")
-    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
-    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot.getPath)
-
-    streamsConfiguration
-  }
-
-  private def getUserRegionsProducerConfig(): Properties = {
-    val p = new Properties()
-    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
-    p.put(ProducerConfig.ACKS_CONFIG, "all")
-    p.put(ProducerConfig.RETRIES_CONFIG, "0")
-    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
-    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
-    p
-  }
-
-  private def getUserClicksProducerConfig(): Properties = {
-    val p = new Properties()
-    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
-    p.put(ProducerConfig.ACKS_CONFIG, "all")
-    p.put(ProducerConfig.RETRIES_CONFIG, "0")
-    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
-    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[LongSerializer])
-    p
-  }
-
-  private def getConsumerConfig(): Properties = {
-    val p = new Properties()
-    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
-    p.put(ConsumerConfig.GROUP_ID_CONFIG, 
"join-scala-integration-test-standard-consumer")
-    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[StringDeserializer])
-    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[LongDeserializer])
-    p
-  }
-
-  private def produceNConsume(userClicksTopic: String,
-                              userRegionsTopic: String,
-                              outputTopic: String): 
java.util.List[KeyValue[String, Long]] = {
-
-    import collection.JavaConverters._
-
-    // Publish user-region information.
-    val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
-    IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
-                                                       userRegions.asJava,
-                                                       
userRegionsProducerConfig,
-                                                       mockTime,
-                                                       false)
-
-    // Publish user-click information.
-    val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
-    IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
-                                                       userClicks.asJava,
-                                                       
userClicksProducerConfig,
-                                                       mockTime,
-                                                       false)
-
-    // consume and verify result
-    val consumerConfig = getConsumerConfig()
-
-    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, 
outputTopic, expectedClicksPerRegion.size)
-  }
 }
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
new file mode 100644
index 00000000000..f5a098b3870
--- /dev/null
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
@@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util.Properties
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.streams._
+import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, 
IntegrationTestUtils}
+import org.apache.kafka.streams.processor.internals.StreamThread
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.kstream._
+import org.apache.kafka.test.TestUtils
+import org.junit.Assert._
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Test suite that verifies the shutdown of StreamThread when metadata is 
incomplete during stream-table joins in Kafka Streams
+ * <p>
+ */
+class StreamToTableJoinWithIncompleteMetadataIntegrationTest extends 
StreamToTableJoinScalaIntegrationTestBase {
+
+  @Test def testShouldAutoShutdownOnIncompleteMetadata(): Unit = {
+
+    // DefaultSerdes brings into scope implicit serdes (mostly for primitives) 
that will set up all Serialized, Produced,
+    // Consumed and Joined instances. So all APIs below that accept 
Serialized, Produced, Consumed or Joined will
+    // get these instances automatically
+    import Serdes._
+
+    val streamsConfiguration: Properties = getStreamsConfiguration()
+
+    val builder = new StreamsBuilder()
+
+    val userClicksStream: KStream[String, Long] = 
builder.stream(userClicksTopic)
+
+    val userRegionsTable: KTable[String, String] = 
builder.table(userRegionsTopic+"1")
+
+    // Compute the total per region by summing the individual click counts per 
region.
+    val clicksPerRegion: KTable[String, Long] =
+      userClicksStream
+
+      // Join the stream against the table.
+        .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) 
"UNKNOWN" else region, clicks))
+
+        // Change the stream from <user> -> <region, clicks> to <region> -> 
<clicks>
+        .map((_, regionWithClicks) => regionWithClicks)
+
+        // Compute the total per region by summing the individual click counts 
per region.
+        .groupByKey
+        .reduce(_ + _)
+
+    // Write the (continuously updating) results to the output topic.
+    clicksPerRegion.toStream.to(outputTopic)
+
+    val streams: KafkaStreamsWrapper = new 
KafkaStreamsWrapper(builder.build(), streamsConfiguration)
+    val listener = new IntegrationTestUtils.StateListenerStub()
+    streams.setStreamThreadStateListener(listener)
+    streams.start()
+
+    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
+      produceNConsume(userClicksTopic, userRegionsTopic, outputTopic, false)
+    while (!listener.revokedToPendingShutdownSeen()) {
+      Thread.sleep(3)
+    }
+    streams.close()
+    assertEquals(listener.runningToRevokedSeen(), true)
+    assertEquals(listener.revokedToPendingShutdownSeen(), true)
+  }
+}
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 41134672e98..4d7215f6823 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -510,22 +510,22 @@ def do_rolling_bounce(self, processor, counter, 
current_generation):
                     monitors[first_other_processor] = first_other_monitor
                     monitors[second_other_processor] = second_other_monitor
 
-                    leader_monitor.wait_until("Received a future (version 
probing) subscription (version: 4). Sending empty assignment back (with 
supported version 3).",
+                    leader_monitor.wait_until("Received a future (version 
probing) subscription (version: 5). Sending empty assignment back (with 
supported version 4).",
                                               timeout_sec=60,
                                               err_msg="Could not detect 
'version probing' attempt at leader " + str(self.leader.node.account))
 
                     if len(self.old_processors) > 0:
-                        log_monitor.wait_until("Sent a version 4 subscription 
and got version 3 assignment back (successful version probing). Downgrading 
subscription metadata to received version and trigger new rebalance.",
+                        log_monitor.wait_until("Sent a version 5 subscription 
and got version 4 assignment back (successful version probing). Downgrading 
subscription metadata to received version and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 
'successful version probing' at upgrading node " + str(node.account))
                     else:
-                        log_monitor.wait_until("Sent a version 4 subscription 
and got version 3 assignment back (successful version probing). Setting 
subscription metadata to leaders supported version 4 and trigger new 
rebalance.",
+                        log_monitor.wait_until("Sent a version 5 subscription 
and got version 4 assignment back (successful version probing). Setting 
subscription metadata to leaders supported version 5 and trigger new 
rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 
'successful version probing with upgraded leader' at upgrading node " + 
str(node.account))
-                        first_other_monitor.wait_until("Sent a version 3 
subscription and group leader.s latest supported version is 4. Upgrading 
subscription metadata version to 4 for next rebalance.",
+                        first_other_monitor.wait_until("Sent a version 4 
subscription and group leader.s latest supported version is 5. Upgrading 
subscription metadata version to 5 for next rebalance.",
                                                        timeout_sec=60,
                                                        err_msg="Never saw 
output 'Upgrade metadata to version 4' on" + str(first_other_node.account))
-                        second_other_monitor.wait_until("Sent a version 3 
subscription and group leader.s latest supported version is 4. Upgrading 
subscription metadata version to 4 for next rebalance.",
+                        second_other_monitor.wait_until("Sent a version 4 
subscription and group leader.s latest supported version is 5. Upgrading 
subscription metadata version to 5 for next rebalance.",
                                                         timeout_sec=60,
                                                         err_msg="Never saw 
output 'Upgrade metadata to version 4' on" + str(second_other_node.account))
 
@@ -553,6 +553,6 @@ def do_rolling_bounce(self, processor, counter, 
current_generation):
 
     def verify_metadata_no_upgraded_yet(self):
         for p in self.processors:
-            found = list(p.node.account.ssh_capture("grep \"Sent a version 3 
subscription and group leader.s latest supported version is 4. Upgrading 
subscription metadata version to 4 for next rebalance.\" " + p.LOG_FILE, 
allow_fail=True))
+            found = list(p.node.account.ssh_capture("grep \"Sent a version 4 
subscription and group leader.s latest supported version is 5. Upgrading 
subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, 
allow_fail=True))
             if len(found) > 0:
                 raise Exception("Kafka Streams failed with 'group member 
upgraded to metadata 4 too early'")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Infinite loop if all input topics are unknown at startup
> --------------------------------------------------------
>
>                 Key: KAFKA-5037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5037
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Matthias J. Sax
>            Assignee: Ted Yu
>            Priority: Major
>              Labels: newbie++, user-experience
>             Fix For: 2.1.0
>
>         Attachments: 5037.v2.txt, 5037.v4.txt
>
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to