Repository: kafka
Updated Branches:
  refs/heads/trunk 8f6a372ee -> 5df1eee7d


http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index a3d7523..c3a372c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -20,13 +20,13 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -37,11 +37,7 @@ import 
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
-import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
-import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
-import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestCondition;
@@ -51,21 +47,16 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
-import java.util.regex.Pattern;
 
-import static java.util.Collections.EMPTY_SET;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -100,18 +91,14 @@ public class StreamThreadTest {
         streamsMetadataState = new 
StreamsMetadataState(internalTopologyBuilder, 
StreamsMetadataState.UNKNOWN_HOST);
     }
 
-    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
-    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
-    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
-    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
-    private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
-    private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
+    private final String topic1 = "topic1";
+
+    private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
+    private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
 
     // task0 is unused
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
-    private final TaskId task3 = new TaskId(1, 1);
-    private final TaskId task4 = new TaskId(1, 2);
 
     private Properties configProps(final boolean enableEos) {
         return new Properties() {
@@ -132,30 +119,19 @@ public class StreamThreadTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testPartitionAssignmentChangeForSingleGroup() {
-        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
"topic1");
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
         final StreamThread thread = getStreamThread();
 
-        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
-        thread.setThreadMetadataProvider(new StreamPartitionAssignor() {
-            @Override
-            public Map<TaskId, Set<TopicPartition>> activeTasks() {
-                return activeTasks;
-            }
-        });
-
         final StateListenerStub stateListener = new StateListenerStub();
         thread.setStateListener(stateListener);
         assertEquals(thread.state(), StreamThread.State.CREATED);
 
         final ConsumerRebalanceListener rebalanceListener = 
thread.rebalanceListener;
         thread.setState(StreamThread.State.RUNNING);
-        assertTrue(thread.tasks().isEmpty());
 
         List<TopicPartition> revokedPartitions;
         List<TopicPartition> assignedPartitions;
-        Set<TopicPartition> expectedGroup1;
-        Set<TopicPartition> expectedGroup2;
 
         // revoke nothing
         revokedPartitions = Collections.emptyList();
@@ -165,59 +141,12 @@ public class StreamThreadTest {
 
         // assign single partition
         assignedPartitions = Collections.singletonList(t1p1);
-        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
-        activeTasks.put(new TaskId(0, 1), expectedGroup1);
+        thread.taskManager().setAssignmentMetadata(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), Collections.<TaskId, 
Set<TopicPartition>>emptyMap());
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
         thread.runOnce(-1);
         assertEquals(thread.state(), StreamThread.State.RUNNING);
         Assert.assertEquals(4, stateListener.numChanges);
         Assert.assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, 
stateListener.oldState);
-        assertTrue(thread.tasks().containsKey(task1));
-        assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(1, thread.tasks().size());
-
-        // revoke single partition
-        revokedPartitions = assignedPartitions;
-        activeTasks.clear();
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-
-        assertFalse(thread.tasks().containsKey(task1));
-        assertEquals(0, thread.tasks().size());
-
-        // assign different single partition
-        assignedPartitions = Collections.singletonList(t1p2);
-        expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
-        activeTasks.put(new TaskId(0, 2), expectedGroup2);
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-        thread.runOnce(-1);
-        assertTrue(thread.tasks().containsKey(task2));
-        assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
-        assertEquals(1, thread.tasks().size());
-
-        // revoke different single partition and assign both partitions
-        revokedPartitions = assignedPartitions;
-        activeTasks.clear();
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-        assignedPartitions = Arrays.asList(t1p1, t1p2);
-        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
-        expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
-        activeTasks.put(new TaskId(0, 1), expectedGroup1);
-        activeTasks.put(new TaskId(0, 2), expectedGroup2);
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-        thread.runOnce(-1);
-        assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task2));
-        assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
-        assertEquals(2, thread.tasks().size());
-
-        // revoke all partitions and assign nothing
-        revokedPartitions = assignedPartitions;
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-        assignedPartitions = Collections.emptyList();
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-        thread.runOnce(-1);
-        assertTrue(thread.tasks().isEmpty());
 
         thread.shutdown();
         assertTrue(thread.state() == StreamThread.State.PENDING_SHUTDOWN);
@@ -225,104 +154,6 @@ public class StreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testPartitionAssignmentChangeForMultipleGroups() {
-        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
"topic1");
-        internalTopologyBuilder.addSource(null, "source2", null, null, null, 
"topic2");
-        internalTopologyBuilder.addSource(null, "source3", null, null, null, 
"topic3");
-        internalTopologyBuilder.addProcessor("processor", new 
MockProcessorSupplier(), "source2", "source3");
-
-        final StreamThread thread = getStreamThread();
-
-        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
-        thread.setThreadMetadataProvider(new StreamPartitionAssignor() {
-            @Override
-            public Map<TaskId, Set<TopicPartition>> activeTasks() {
-                return activeTasks;
-            }
-        });
-
-        final StateListenerStub stateListener = new StateListenerStub();
-        thread.setStateListener(stateListener);
-        assertEquals(thread.state(), StreamThread.State.CREATED);
-
-        final ConsumerRebalanceListener rebalanceListener = 
thread.rebalanceListener;
-        thread.setState(StreamThread.State.RUNNING);
-        assertTrue(thread.tasks().isEmpty());
-
-        List<TopicPartition> revokedPartitions;
-        List<TopicPartition> assignedPartitions;
-        Set<TopicPartition> expectedGroup1;
-        Set<TopicPartition> expectedGroup2;
-
-        // revoke nothing
-        revokedPartitions = Collections.emptyList();
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-
-        assertEquals(thread.state(), StreamThread.State.PARTITIONS_REVOKED);
-
-        // assign four new partitions of second subtopology
-        assignedPartitions = Arrays.asList(t2p1, t2p2, t3p1, t3p2);
-        expectedGroup1 = new HashSet<>(Arrays.asList(t2p1, t3p1));
-        expectedGroup2 = new HashSet<>(Arrays.asList(t2p2, t3p2));
-        activeTasks.put(new TaskId(1, 1), expectedGroup1);
-        activeTasks.put(new TaskId(1, 2), expectedGroup2);
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-        thread.runOnce(-1);
-
-        assertTrue(thread.tasks().containsKey(task3));
-        assertTrue(thread.tasks().containsKey(task4));
-        assertEquals(expectedGroup1, thread.tasks().get(task3).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
-        assertEquals(2, thread.tasks().size());
-
-        // revoke four partitions and assign three partitions of both 
subtopologies
-        revokedPartitions = assignedPartitions;
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-
-        assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
-        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
-        expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
-        activeTasks.put(new TaskId(0, 1), expectedGroup1);
-        activeTasks.put(new TaskId(1, 1), expectedGroup2);
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-        thread.runOnce(-1);
-
-        assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task3));
-        assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task3).partitions());
-        assertEquals(2, thread.tasks().size());
-
-        // revoke all three partitons and reassign the same three partitions 
(from different subtopologies)
-        revokedPartitions = assignedPartitions;
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-        assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
-        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
-        expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-        thread.runOnce(-1);
-
-        assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task3));
-        assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task3).partitions());
-        assertEquals(2, thread.tasks().size());
-
-        // revoke all partitions and assign nothing
-        revokedPartitions = assignedPartitions;
-        rebalanceListener.onPartitionsRevoked(revokedPartitions);
-        assignedPartitions = Collections.emptyList();
-        rebalanceListener.onPartitionsAssigned(assignedPartitions);
-        thread.runOnce(-1);
-
-        assertTrue(thread.tasks().isEmpty());
-
-        thread.shutdown();
-        assertEquals(thread.state(), StreamThread.State.PENDING_SHUTDOWN);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
     public void testStateChangeStartClose() throws InterruptedException {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
@@ -367,128 +198,12 @@ public class StreamThreadTest {
                                    new MockStateRestoreListener());
     }
 
-    private final static String TOPIC = "topic";
-    private final Set<TopicPartition> task0Assignment = 
Collections.singleton(new TopicPartition(TOPIC, 0));
-    private final Set<TopicPartition> task1Assignment = 
Collections.singleton(new TopicPartition(TOPIC, 1));
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testHandingOverTaskFromOneToAnotherThread() throws 
InterruptedException {
-        internalTopologyBuilder.addStateStore(
-            Stores
-                .create("store")
-                .withByteArrayKeys()
-                .withByteArrayValues()
-                .persistent()
-                .build()
-        );
-        internalTopologyBuilder.addSource(null, "source", null, null, null, 
TOPIC);
-
-        TopicPartition tp0 = new TopicPartition(TOPIC, 0);
-        TopicPartition tp1 = new TopicPartition(TOPIC, 1);
-        clientSupplier.consumer.assign(Arrays.asList(tp0, tp1));
-        final Map<TopicPartition, Long> offsets = new HashMap<>();
-        offsets.put(tp0, 0L);
-        offsets.put(tp1, 0L);
-        clientSupplier.consumer.updateBeginningOffsets(offsets);
-
-        final StreamThread thread1 = createStreamThread(clientId + 1, config, 
false);
-        final StreamThread thread2 = createStreamThread(clientId + 2, config, 
false);
-
-
-        final Map<TaskId, Set<TopicPartition>> task0 = 
Collections.singletonMap(new TaskId(0, 0), task0Assignment);
-        final Map<TaskId, Set<TopicPartition>> task1 = 
Collections.singletonMap(new TaskId(0, 1), task1Assignment);
-
-        final Map<TaskId, Set<TopicPartition>> thread1Assignment = new 
HashMap<>(task0);
-        final Map<TaskId, Set<TopicPartition>> thread2Assignment = new 
HashMap<>(task1);
-
-        thread1.setThreadMetadataProvider(new 
MockStreamsPartitionAssignor(thread1Assignment));
-        thread2.setThreadMetadataProvider(new 
MockStreamsPartitionAssignor(thread2Assignment));
-
-        // revoke (to get threads in correct state)
-        thread1.setState(StreamThread.State.RUNNING);
-        thread2.setState(StreamThread.State.RUNNING);
-        thread1.rebalanceListener.onPartitionsRevoked(EMPTY_SET);
-        thread2.rebalanceListener.onPartitionsRevoked(EMPTY_SET);
-
-        // assign
-        thread1.rebalanceListener.onPartitionsAssigned(task0Assignment);
-        thread1.runOnce(-1);
-        thread2.rebalanceListener.onPartitionsAssigned(task1Assignment);
-        thread2.runOnce(-1);
-
-        final Set<TaskId> originalTaskAssignmentThread1 = new HashSet<>();
-        originalTaskAssignmentThread1.addAll(thread1.tasks().keySet());
-        final Set<TaskId> originalTaskAssignmentThread2 = new HashSet<>();
-        originalTaskAssignmentThread2.addAll(thread2.tasks().keySet());
-
-        // revoke (task will be suspended)
-        thread1.rebalanceListener.onPartitionsRevoked(task0Assignment);
-        thread2.rebalanceListener.onPartitionsRevoked(task1Assignment);
-
-        assertThat(thread1.prevActiveTasks(), 
equalTo(originalTaskAssignmentThread1));
-        assertThat(thread2.prevActiveTasks(), 
equalTo(originalTaskAssignmentThread2));
-
-        // assign reverted
-        thread1Assignment.clear();
-        thread1Assignment.putAll(task1);
-
-        thread2Assignment.clear();
-        thread2Assignment.putAll(task0);
-
-        final Thread runIt = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                
thread1.rebalanceListener.onPartitionsAssigned(task1Assignment);
-                thread1.runOnce(-1);
-            }
-        });
-        runIt.start();
-
-        thread2.rebalanceListener.onPartitionsAssigned(task0Assignment);
-        thread2.runOnce(-1);
-
-        runIt.join();
-
-        assertThat(thread1.tasks().keySet(), 
equalTo(originalTaskAssignmentThread2));
-        assertThat(thread2.tasks().keySet(), 
equalTo(originalTaskAssignmentThread1));
-    }
-
-    private class MockStreamsPartitionAssignor extends StreamPartitionAssignor 
{
-
-        private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;
-        private final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment;
-
-        MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> 
activeTaskAssignment) {
-            this(activeTaskAssignment, Collections.<TaskId, 
Set<TopicPartition>>emptyMap());
-        }
-
-        MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> 
activeTaskAssignment,
-                                     final Map<TaskId, Set<TopicPartition>> 
standbyTaskAssignment) {
-            this.activeTaskAssignment = activeTaskAssignment;
-            this.standbyTaskAssignment = standbyTaskAssignment;
-        }
-
-        @Override
-        public Map<TaskId, Set<TopicPartition>> activeTasks() {
-            return activeTaskAssignment;
-        }
-
-        @Override
-        public Map<TaskId, Set<TopicPartition>> standbyTasks() {
-            return standbyTaskAssignment;
-        }
-
-        @Override
-        public void close() {}
-    }
-
     @Test
     public void testMetrics() {
         final StreamThread thread = createStreamThread(clientId, config, 
false);
         final String defaultGroupName = "stream-metrics";
-        final String defaultPrefix = "thread." + thread.threadClientId();
-        final Map<String, String> defaultTags = 
Collections.singletonMap("client-id", thread.threadClientId());
+        final String defaultPrefix = "thread." + thread.getName();
+        final Map<String, String> defaultTags = 
Collections.singletonMap("client-id", thread.getName());
 
         assertNotNull(metrics.getSensor(defaultPrefix + ".commit-latency"));
         assertNotNull(metrics.getSensor(defaultPrefix + ".poll-latency"));
@@ -529,19 +244,18 @@ public class StreamThreadTest {
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
 
         StreamThread.StreamsMetricsThreadImpl streamsMetrics = new 
StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, 
String>emptyMap());
-        final StreamThread thread = new StreamThread(internalTopologyBuilder,
-                                                     clientId,
-                                                     "",
-                                                     config,
-                                                     processId,
-                                                     mockTime,
-                                                     streamsMetadataState,
-                                                     taskManager,
-                                                     streamsMetrics,
-                                                     clientSupplier,
-                                                     consumer,
-                                                     
clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
-                                                     stateDirectory);
+        final StreamThread thread = new StreamThread(mockTime,
+                config,
+                consumer,
+                consumer,
+                null,
+                
clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext("")
+        );
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval - 10L);
         thread.maybeCommit(mockTime.milliseconds());
@@ -562,19 +276,17 @@ public class StreamThreadTest {
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
         StreamThread.StreamsMetricsThreadImpl streamsMetrics = new 
StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, 
String>emptyMap());
-        final StreamThread thread = new StreamThread(internalTopologyBuilder,
-                                                     clientId,
-                                                     "",
-                                                     config,
-                                                     processId,
-                                                     mockTime,
-                                                     streamsMetadataState,
-                                                     taskManager,
-                                                     streamsMetrics,
-                                                     clientSupplier,
-                                                     consumer,
-                                                     
clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
-                                                     stateDirectory);
+        final StreamThread thread = new StreamThread(mockTime,
+                config,
+                consumer,
+                consumer,
+                null,
+                
clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext(""));
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval - 10L);
         thread.maybeCommit(mockTime.milliseconds());
@@ -596,19 +308,17 @@ public class StreamThreadTest {
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
 
         StreamThread.StreamsMetricsThreadImpl streamsMetrics = new 
StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, 
String>emptyMap());
-        final StreamThread thread = new StreamThread(internalTopologyBuilder,
-                                                     clientId,
-                                                     "",
-                                                     config,
-                                                     processId,
-                                                     mockTime,
-                                                     streamsMetadataState,
-                                                     taskManager,
-                                                     streamsMetrics,
-                                                     clientSupplier,
-                                                     consumer,
-                                                     
clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
-                                                     stateDirectory);
+        final StreamThread thread = new StreamThread(mockTime,
+                config,
+                consumer,
+                consumer,
+                null,
+                
clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext(""));
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval + 1);
         thread.maybeCommit(mockTime.milliseconds());
@@ -619,8 +329,6 @@ public class StreamThreadTest {
     @SuppressWarnings({"ThrowableNotThrown", "unchecked"})
     private TaskManager mockTaskManagerCommit(final Consumer<byte[], byte[]> 
consumer, final int numberOfCommits, final int commits) {
         final TaskManager taskManager = EasyMock.createMock(TaskManager.class);
-        taskManager.setConsumer(EasyMock.anyObject(Consumer.class));
-        EasyMock.expectLastCall();
         
EasyMock.expect(taskManager.commitAll()).andReturn(commits).times(numberOfCommits);
         EasyMock.replay(taskManager, consumer);
         return taskManager;
@@ -628,18 +336,26 @@ public class StreamThreadTest {
 
     @Test
     public void 
shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() 
throws InterruptedException {
-        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
"someTopic");
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
-        assignment.put(new TaskId(0, 0), Collections.singleton(new 
TopicPartition("someTopic", 0)));
-        assignment.put(new TaskId(0, 1), Collections.singleton(new 
TopicPartition("someTopic", 1)));
-        thread.setThreadMetadataProvider(new 
MockStreamsPartitionAssignor(assignment));
-
         thread.setState(StreamThread.State.RUNNING);
         
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-        
thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new 
TopicPartition("someTopic", 0)));
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        assignedPartitions.add(t1p2);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+        activeTasks.put(task2, Collections.singleton(t1p2));
+
+        thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().createTasks(assignedPartitions);
+
+        thread.rebalanceListener.onPartitionsAssigned(new 
HashSet<>(assignedPartitions));
 
         assertEquals(1, clientSupplier.producers.size());
         final Producer globalProducer = clientSupplier.producers.get(0);
@@ -652,46 +368,55 @@ public class StreamThreadTest {
 
     @Test
     public void 
shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() throws 
InterruptedException {
-        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
"someTopic");
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
 
-        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
-        assignment.put(new TaskId(0, 0), Collections.singleton(new 
TopicPartition("someTopic", 0)));
-        assignment.put(new TaskId(0, 1), Collections.singleton(new 
TopicPartition("someTopic", 1)));
-        assignment.put(new TaskId(0, 2), Collections.singleton(new 
TopicPartition("someTopic", 2)));
-        thread.setThreadMetadataProvider(new 
MockStreamsPartitionAssignor(assignment));
-
-        final Set<TopicPartition> assignedPartitions = new HashSet<>();
-        Collections.addAll(assignedPartitions, new TopicPartition("someTopic", 
0), new TopicPartition("someTopic", 2));
         thread.setState(StreamThread.State.RUNNING);
         
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        assignedPartitions.add(t1p2);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+        activeTasks.put(task2, Collections.singleton(t1p2));
+
+        thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        thread.rebalanceListener.onPartitionsAssigned(new 
HashSet<>(assignedPartitions));
+
         thread.runOnce(-1);
 
         assertEquals(thread.tasks().size(), clientSupplier.producers.size());
-        final Iterator it = clientSupplier.producers.iterator();
-        for (final Task task : thread.tasks().values()) {
-            assertSame(it.next(), ((RecordCollectorImpl) ((StreamTask) 
task).recordCollector()).producer());
-        }
         assertSame(clientSupplier.consumer, thread.consumer);
         assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
     }
 
     @Test
     public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws 
InterruptedException {
-        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
"someTopic");
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
 
-        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
-        assignment.put(new TaskId(0, 0), Collections.singleton(new 
TopicPartition("someTopic", 0)));
-        assignment.put(new TaskId(0, 1), Collections.singleton(new 
TopicPartition("someTopic", 1)));
-        thread.setThreadMetadataProvider(new 
MockStreamsPartitionAssignor(assignment));
-
         thread.setState(StreamThread.State.RUNNING);
         
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-        
thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new 
TopicPartition("someTopic", 0)));
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        assignedPartitions.add(t1p2);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+        activeTasks.put(task2, Collections.singleton(t1p2));
+
+        thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().createTasks(assignedPartitions);
+
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.shutdown();
         thread.run();
@@ -706,26 +431,24 @@ public class StreamThreadTest {
     public void shouldShutdownTaskManagerOnClose() throws InterruptedException 
{
         final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
-        taskManager.setConsumer(EasyMock.anyObject(Consumer.class));
-        EasyMock.expectLastCall();
+        
EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.<TaskId, 
StreamTask>emptyMap());
+        
EasyMock.expect(taskManager.standbyTasks()).andReturn(Collections.<TaskId, 
StandbyTask>emptyMap());
         taskManager.shutdown(true);
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
         StreamThread.StreamsMetricsThreadImpl streamsMetrics = new 
StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, 
String>emptyMap());
-        final StreamThread thread = new StreamThread(internalTopologyBuilder,
-                                                     clientId,
-                                                     "",
-                                                     config,
-                                                     processId,
-                                                     mockTime,
-                                                     streamsMetadataState,
-                                                     taskManager,
-                                                     streamsMetrics,
-                                                     clientSupplier,
-                                                     consumer,
-                                                     
clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
-                                                     stateDirectory);
+        final StreamThread thread = new StreamThread(mockTime,
+                config,
+                consumer,
+                consumer,
+                null,
+                
clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext(""));
         thread.setState(StreamThread.State.RUNNING);
         thread.shutdown();
         thread.run();
@@ -739,112 +462,55 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        thread.setThreadMetadataProvider(new StreamPartitionAssignor() {
-            @Override
-            public Map<TaskId, Set<TopicPartition>> standbyTasks() {
-                return Collections.singletonMap(new TaskId(0, 0), 
Utils.mkSet(new TopicPartition("topic", 0)));
-            }
-        });
-
         thread.setState(StreamThread.State.RUNNING);
         
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-        
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
-    }
-
-    @Test
-    public void 
shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks()
 {
-        internalStreamsBuilder.stream(Collections.singleton("t1"), 
consumed).groupByKey().count("count-one");
-        internalStreamsBuilder.stream(Collections.singleton("t2"), 
consumed).groupByKey().count("count-two");
-
-        final StreamThread thread = createStreamThread(clientId, config, 
false);
-        final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
-        
restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
-                                         Collections.singletonList(new 
PartitionInfo("stream-thread-test-count-one-changelog",
-                                                                               
      0,
-                                                                               
      null,
-                                                                               
      new Node[0],
-                                                                               
      new Node[0])));
-        
restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog",
-                                         Collections.singletonList(new 
PartitionInfo("stream-thread-test-count-two-changelog",
-                                                                               
      0,
-                                                                               
      null,
-                                                                               
      new Node[0],
-                                                                               
      new Node[0])));
-
-
-        final HashMap<TopicPartition, Long> offsets = new HashMap<>();
-        offsets.put(new 
TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
-        offsets.put(new 
TopicPartition("stream-thread-test-count-two-changelog", 0), 0L);
-        restoreConsumer.updateEndOffsets(offsets);
-        restoreConsumer.updateBeginningOffsets(offsets);
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-        final TopicPartition t1 = new TopicPartition("t1", 0);
-        Set<TopicPartition> partitionsT1 = Utils.mkSet(t1);
-        standbyTasks.put(new TaskId(0, 0), partitionsT1);
-
-        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
-        final TopicPartition t2 = new TopicPartition("t2", 0);
-        Set<TopicPartition> partitionsT2 = Utils.mkSet(t2);
-        activeTasks.put(new TaskId(1, 0), partitionsT2);
-        
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t2, 
0L));
 
-        thread.setThreadMetadataProvider(new StreamPartitionAssignor() {
-            @Override
-            public Map<TaskId, Set<TopicPartition>> standbyTasks() {
-                return standbyTasks;
-            }
-
-            @Override
-            public Map<TaskId, Set<TopicPartition>> activeTasks() {
-                return activeTasks;
-            }
-        });
+        // assign single partition
+        standbyTasks.put(task1, Collections.singleton(t1p1));
 
-        thread.setState(StreamThread.State.RUNNING);
-        
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-        clientSupplier.consumer.assign(partitionsT2);
-        thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t2));
-        thread.runOnce(-1);
-        // swap the assignment around and make sure we don't get any exceptions
-        standbyTasks.clear();
-        activeTasks.clear();
-        standbyTasks.put(new TaskId(1, 0), Utils.mkSet(t2));
-        activeTasks.put(new TaskId(0, 0), Utils.mkSet(t1));
+        thread.taskManager().setAssignmentMetadata(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), standbyTasks);
+        
thread.taskManager().createTasks(Collections.<TopicPartition>emptyList());
 
-        
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-        clientSupplier.consumer.assign(partitionsT1);
-        thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t1));
+        
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
     }
 
     @Test
     public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing()
 throws InterruptedException {
-        internalTopologyBuilder.addSource(null, "source", null, null, null, 
TOPIC);
+        internalTopologyBuilder.addSource(null, "source", null, null, null, 
topic1);
         internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, 
null, "source");
 
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
 
         final MockConsumer<byte[], byte[]> consumer = clientSupplier.consumer;
-        consumer.updatePartitions(TOPIC, Collections.singletonList(new 
PartitionInfo(TOPIC, 0, null, null, null)));
-
-        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
-        activeTasks.put(task1, task0Assignment);
 
-        thread.setThreadMetadataProvider(new 
MockStreamsPartitionAssignor(activeTasks));
+        consumer.updatePartitions(topic1, Collections.singletonList(new 
PartitionInfo(topic1, 1, null, null, null)));
 
         thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(null);
-        thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+
+        thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
         thread.runOnce(-1);
         assertThat(thread.tasks().size(), equalTo(1));
         final MockProducer producer = clientSupplier.producers.get(0);
 
         // change consumer subscription from "pattern" to "manual" to be able 
to call .addRecords()
-        
consumer.updateBeginningOffsets(Collections.singletonMap(task0Assignment.iterator().next(),
 0L));
+        
consumer.updateBeginningOffsets(Collections.singletonMap(assignedPartitions.iterator().next(),
 0L));
         consumer.unsubscribe();
-        consumer.assign(task0Assignment);
+        consumer.assign(new HashSet<>(assignedPartitions));
 
-        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new 
byte[0]));
+        consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new 
byte[0]));
         mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) 
+ 1);
         thread.runOnce(-1);
         assertThat(producer.history().size(), equalTo(1));
@@ -862,7 +528,7 @@ public class StreamThreadTest {
 
         producer.fenceProducer();
         mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) 
+ 1L);
-        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new 
byte[0]));
+        consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new 
byte[0]));
         try {
             thread.runOnce(-1);
             fail("Should have thrown TaskMigratedException");
@@ -881,26 +547,31 @@ public class StreamThreadTest {
 
     @Test
     public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed()
 {
-        internalTopologyBuilder.addSource(null, "name", null, null, null, 
"topic");
+        internalTopologyBuilder.addSource(null, "name", null, null, null, 
topic1);
         internalTopologyBuilder.addSink("out", "output", null, null, null);
 
         final StreamThread thread = createStreamThread(clientId, new 
StreamsConfig(configProps(true)), true);
 
+        thread.setState(StreamThread.State.RUNNING);
+        thread.rebalanceListener.onPartitionsRevoked(null);
+
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
-        activeTasks.put(task1, task0Assignment);
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
-        thread.setThreadMetadataProvider(new 
MockStreamsPartitionAssignor(activeTasks));
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+
+        thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
-        thread.setState(StreamThread.State.RUNNING);
-        thread.rebalanceListener.onPartitionsRevoked(null);
-        thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
         thread.runOnce(-1);
 
         assertThat(thread.tasks().size(), equalTo(1));
 
         thread.rebalanceListener.onPartitionsRevoked(null);
         clientSupplier.producers.get(0).fenceProducer();
-        thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
         try {
             thread.runOnce(-1);
             fail("Should have thrown TaskMigratedException");
@@ -909,80 +580,6 @@ public class StreamThreadTest {
         assertTrue(thread.tasks().isEmpty());
     }
 
-    @Test
-    @SuppressWarnings("unchecked")
-    public void 
shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws 
Exception {
-        internalTopologyBuilder.addSource(null, "source", null, null, null, 
Pattern.compile("t.*"));
-        internalTopologyBuilder.addProcessor("processor", new 
MockProcessorSupplier(), "source");
-
-        final StreamThread thread = createStreamThread(clientId, config, 
false);
-
-        final StreamPartitionAssignor partitionAssignor = new 
StreamPartitionAssignor();
-        final Map<String, Object> configurationMap = new HashMap<>();
-
-        
configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, 
thread);
-        configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 0);
-        partitionAssignor.configure(configurationMap);
-
-        thread.setThreadMetadataProvider(partitionAssignor);
-
-        final Field nodeToSourceTopicsField =
-            
internalTopologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
-        nodeToSourceTopicsField.setAccessible(true);
-        final Map<String, List<String>>
-            nodeToSourceTopics =
-            (Map<String, List<String>>) 
nodeToSourceTopicsField.get(internalTopologyBuilder);
-        final List<TopicPartition> topicPartitions = new ArrayList<>();
-
-        final TopicPartition topicPartition1 = new TopicPartition("topic-1", 
0);
-        final TopicPartition topicPartition2 = new TopicPartition("topic-2", 
0);
-        final TopicPartition topicPartition3 = new TopicPartition("topic-3", 
0);
-
-        final TaskId taskId1 = new TaskId(0, 0);
-        final TaskId taskId2 = new TaskId(0, 0);
-        final TaskId taskId3 = new TaskId(0, 0);
-
-        List<TaskId> activeTasks = Utils.mkList(taskId1);
-
-        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-
-        AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, 
new HashMap<HostInfo, Set<TopicPartition>>());
-
-        topicPartitions.addAll(Utils.mkList(topicPartition1));
-        PartitionAssignor.Assignment assignment = new 
PartitionAssignor.Assignment(topicPartitions, info.encode());
-        partitionAssignor.onAssignment(assignment);
-
-        assertTrue(nodeToSourceTopics.get("source").size() == 1);
-        assertTrue(nodeToSourceTopics.get("source").contains("topic-1"));
-
-        topicPartitions.clear();
-
-        activeTasks = Arrays.asList(taskId1, taskId2);
-        info = new AssignmentInfo(activeTasks, standbyTasks, new 
HashMap<HostInfo, Set<TopicPartition>>());
-        topicPartitions.addAll(Arrays.asList(topicPartition1, 
topicPartition2));
-        assignment = new PartitionAssignor.Assignment(topicPartitions, 
info.encode());
-        partitionAssignor.onAssignment(assignment);
-
-        assertTrue(nodeToSourceTopics.get("source").size() == 2);
-        assertTrue(nodeToSourceTopics.get("source").contains("topic-1"));
-        assertTrue(nodeToSourceTopics.get("source").contains("topic-2"));
-
-        topicPartitions.clear();
-
-        activeTasks = Arrays.asList(taskId1, taskId2, taskId3);
-        info = new AssignmentInfo(activeTasks, standbyTasks,
-                               new HashMap<HostInfo, Set<TopicPartition>>());
-        topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2, 
topicPartition3));
-        assignment = new PartitionAssignor.Assignment(topicPartitions, 
info.encode());
-        partitionAssignor.onAssignment(assignment);
-
-        assertTrue(nodeToSourceTopics.get("source").size() == 3);
-        assertTrue(nodeToSourceTopics.get("source").contains("topic-1"));
-        assertTrue(nodeToSourceTopics.get("source").contains("topic-2"));
-        assertTrue(nodeToSourceTopics.get("source").contains("topic-3"));
-
-    }
-
     private static class StateListenerStub implements 
StreamThread.StateListener {
         int numChanges = 0;
         ThreadStateTransitionValidator oldState = null;
@@ -1006,33 +603,39 @@ public class StreamThreadTest {
         return createStreamThread(clientId, config, false);
     }
 
-
     @Test
     public void shouldReturnActiveTaskMetadataWhileRunningState() throws 
InterruptedException {
-        internalTopologyBuilder.addSource(null, "source", null, null, null, 
TOPIC);
+        internalTopologyBuilder.addSource(null, "source", null, null, null, 
topic1);
 
-        final TaskId taskId = new TaskId(0, 0);
         final StreamThread thread = createStreamThread(clientId, config, 
false);
 
-        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
-        assignment.put(taskId, task0Assignment);
-        thread.setThreadMetadataProvider(new 
MockStreamsPartitionAssignor(assignment));
-
         thread.setState(StreamThread.State.RUNNING);
 
         thread.rebalanceListener.onPartitionsRevoked(null);
-        thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+
+        thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().createTasks(assignedPartitions);
+
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
         thread.runOnce(-1);
 
         ThreadMetadata threadMetadata = thread.threadMetadata();
         assertEquals(StreamThread.State.RUNNING.name(), 
threadMetadata.threadState());
-        assertTrue(threadMetadata.activeTasks().contains(new 
TaskMetadata(taskId.toString(), task0Assignment)));
+        assertTrue(threadMetadata.activeTasks().contains(new 
TaskMetadata(task1.toString(), Utils.mkSet(t1p1))));
         assertTrue(threadMetadata.standbyTasks().isEmpty());
     }
 
     @Test
     public void shouldReturnStandbyTaskMetadataWhileRunningState() throws 
InterruptedException {
-        internalStreamsBuilder.stream(Collections.singleton("t1"), 
consumed).groupByKey().count("count-one");
+        internalStreamsBuilder.stream(Collections.singleton(topic1), 
consumed).groupByKey().count("count-one");
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
@@ -1044,30 +647,28 @@ public class StreamThreadTest {
                         new Node[0])));
 
         final HashMap<TopicPartition, Long> offsets = new HashMap<>();
-        offsets.put(new 
TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
+        offsets.put(new 
TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
         restoreConsumer.updateEndOffsets(offsets);
         restoreConsumer.updateBeginningOffsets(offsets);
 
-        final TaskId taskId = new TaskId(0, 0);
+        thread.setState(StreamThread.State.RUNNING);
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-        final TopicPartition t1 = new TopicPartition("t1", 0);
-        Set<TopicPartition> partitionsT1 = Utils.mkSet(t1);
-        standbyTasks.put(taskId, partitionsT1);
 
-        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        // assign single partition
+        standbyTasks.put(task1, Collections.singleton(t1p1));
 
-        thread.setThreadMetadataProvider(new 
MockStreamsPartitionAssignor(activeTasks, standbyTasks));
+        thread.taskManager().setAssignmentMetadata(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), standbyTasks);
 
-        thread.setState(StreamThread.State.RUNNING);
+        
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
 
-        thread.rebalanceListener.onPartitionsRevoked(task0Assignment);
-        thread.rebalanceListener.onPartitionsAssigned(null);
         thread.runOnce(-1);
 
         ThreadMetadata threadMetadata = thread.threadMetadata();
         assertEquals(StreamThread.State.RUNNING.name(), 
threadMetadata.threadState());
-        assertTrue(threadMetadata.standbyTasks().contains(new 
TaskMetadata(taskId.toString(), partitionsT1)));
+        assertTrue(threadMetadata.standbyTasks().contains(new 
TaskMetadata(task1.toString(), Utils.mkSet(t1p1))));
         assertTrue(threadMetadata.activeTasks().isEmpty());
     }
 
@@ -1084,7 +685,7 @@ public class StreamThreadTest {
 
     @Test
     public void 
shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() 
throws InterruptedException {
-        internalStreamsBuilder.stream(Collections.singleton("t1"), 
consumed).groupByKey().count("count-one");
+        internalStreamsBuilder.stream(Collections.singleton(topic1), 
consumed).groupByKey().count("count-one");
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
@@ -1107,48 +708,28 @@ public class StreamThreadTest {
         restoreConsumer.updateEndOffsets(offsets);
         restoreConsumer.updateBeginningOffsets(offsets);
 
-        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-        final TopicPartition t1p0 = new TopicPartition("t1", 0);
-        Set<TopicPartition> partitionsT1P0 = Utils.mkSet(t1p0);
-        standbyTasks.put(new TaskId(0, 0), partitionsT1P0);
-
-        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
-        final TopicPartition t1p1 = new TopicPartition("t1", 1);
-        Set<TopicPartition> partitionsT1P1 = Utils.mkSet(t1p1);
-        activeTasks.put(new TaskId(0, 1), partitionsT1P1);
         
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
-        thread.setThreadMetadataProvider(new StreamPartitionAssignor() {
-            @Override
-            public Map<TaskId, Set<TopicPartition>> standbyTasks() {
-                return standbyTasks;
-            }
-
-            @Override
-            public Map<TaskId, Set<TopicPartition>> activeTasks() {
-                return activeTasks;
-            }
-        });
 
         thread.setState(StreamThread.State.RUNNING);
 
-        thread.rebalanceListener.onPartitionsRevoked(partitionsT1P0);
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        thread.rebalanceListener.onPartitionsRevoked(assignedPartitions);
         assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), 
StreamThread.State.PARTITIONS_REVOKED);
 
-        clientSupplier.consumer.assign(partitionsT1P1);
-        thread.rebalanceListener.onPartitionsAssigned(partitionsT1P1);
-        assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), 
StreamThread.State.PARTITIONS_ASSIGNED);
-        thread.runOnce(-1);
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
 
-        standbyTasks.clear();
-        activeTasks.clear();
-        standbyTasks.put(new TaskId(0, 1), Utils.mkSet(t1p1));
-        activeTasks.put(new TaskId(0, 0), Utils.mkSet(t1p0));
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+        standbyTasks.put(task2, Collections.singleton(t1p2));
 
-        assertFalse(thread.threadMetadata().activeTasks().isEmpty());
-        assertFalse(thread.threadMetadata().standbyTasks().isEmpty());
+        thread.taskManager().setAssignmentMetadata(activeTasks, standbyTasks);
 
-        
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-        assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), 
StreamThread.State.PARTITIONS_REVOKED);
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), 
StreamThread.State.PARTITIONS_ASSIGNED);
     }
 
     private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata 
metadata, StreamThread.State state) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
index a399dd4..660a622 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
@@ -25,11 +25,8 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -79,10 +76,9 @@ public class StreamsKafkaClientTest {
     public void testConfigFromStreamsConfig() {
         for (final String expectedMechanism : asList("PLAIN", 
"SCRAM-SHA-512")) {
             config.put(SaslConfigs.SASL_MECHANISM, expectedMechanism);
-            final StreamsConfig streamsConfig = new StreamsConfig(config);
-            final AbstractConfig config = 
StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig);
-            assertEquals(expectedMechanism, 
config.values().get(SaslConfigs.SASL_MECHANISM));
-            assertEquals(expectedMechanism, 
config.getString(SaslConfigs.SASL_MECHANISM));
+            final AbstractConfig abstractConfig = 
StreamsKafkaClient.Config.fromStreamsConfig(config);
+            assertEquals(expectedMechanism, 
abstractConfig.values().get(SaslConfigs.SASL_MECHANISM));
+            assertEquals(expectedMechanism, 
abstractConfig.getString(SaslConfigs.SASL_MECHANISM));
         }
     }
 
@@ -138,7 +134,7 @@ public class StreamsKafkaClientTest {
     public void metricsShouldBeTaggedWithClientId() {
         config.put(StreamsConfig.CLIENT_ID_CONFIG, "some_client_id");
         config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
TestMetricsReporter.class.getName());
-        StreamsKafkaClient.create(new StreamsConfig(config));
+        StreamsKafkaClient.create(config);
         assertFalse(TestMetricsReporter.METRICS.isEmpty());
         for (KafkaMetric kafkaMetric : TestMetricsReporter.METRICS.values()) {
             assertEquals("some_client_id", 
kafkaMetric.metricName().tags().get("client-id"));
@@ -146,34 +142,6 @@ public class StreamsKafkaClientTest {
     }
 
     @Test(expected = StreamsException.class)
-    public void 
shouldThrowStreamsExceptionOnEmptyBrokerCompatibilityResponse() {
-        kafkaClient.prepareResponse(null);
-        final StreamsKafkaClient streamsKafkaClient = 
createStreamsKafkaClient();
-        streamsKafkaClient.checkBrokerCompatibility(false);
-    }
-
-    @Test(expected = StreamsException.class)
-    public void 
shouldThrowStreamsExceptionWhenBrokerCompatibilityResponseInconsistent() {
-        kafkaClient.prepareResponse(new 
ProduceResponse(Collections.<TopicPartition, 
ProduceResponse.PartitionResponse>emptyMap()));
-        final StreamsKafkaClient streamsKafkaClient = 
createStreamsKafkaClient();
-        streamsKafkaClient.checkBrokerCompatibility(false);
-    }
-
-    @Test(expected = StreamsException.class)
-    public void shouldRequireBrokerVersion0101OrHigherWhenEosDisabled() {
-        kafkaClient.prepareResponse(new ApiVersionsResponse(Errors.NONE, 
Collections.singletonList(new 
ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE))));
-        final StreamsKafkaClient streamsKafkaClient = 
createStreamsKafkaClient();
-        streamsKafkaClient.checkBrokerCompatibility(false);
-    }
-
-    @Test(expected = StreamsException.class)
-    public void shouldRequireBrokerVersions0110OrHigherWhenEosEnabled() {
-        kafkaClient.prepareResponse(new ApiVersionsResponse(Errors.NONE, 
Collections.singletonList(new 
ApiVersionsResponse.ApiVersion(ApiKeys.CREATE_TOPICS))));
-        final StreamsKafkaClient streamsKafkaClient = 
createStreamsKafkaClient();
-        streamsKafkaClient.checkBrokerCompatibility(true);
-    }
-
-    @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionOnEmptyFetchMetadataResponse() {
         kafkaClient.prepareResponse(null);
         final StreamsKafkaClient streamsKafkaClient = 
createStreamsKafkaClient();
@@ -213,8 +181,7 @@ public class StreamsKafkaClientTest {
     }
 
     private StreamsKafkaClient createStreamsKafkaClient() {
-        final StreamsConfig streamsConfig = new StreamsConfig(config);
-        return new 
StreamsKafkaClient(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig),
+        return new 
StreamsKafkaClient(StreamsKafkaClient.Config.fromStreamsConfig(config),
                                       kafkaClient,
                                       reporters,
                                       new LogContext());

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 1640f9e..55dcf79 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -22,19 +22,28 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
+
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
 import org.easymock.MockType;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
+import java.util.regex.Pattern;
 
 import static org.easymock.EasyMock.checkOrder;
 import static org.easymock.EasyMock.expect;
@@ -54,9 +63,17 @@ public class TaskManagerTest {
     private final Set<TopicPartition> taskId0Partitions = Utils.mkSet(t1p0);
     private final Map<TaskId, Set<TopicPartition>> taskId0Assignment = 
Collections.singletonMap(taskId0, taskId0Partitions);
 
+    @Mock(type = MockType.STRICT)
+    private InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates;
+    @Mock(type = MockType.STRICT)
+    private InternalTopologyBuilder topologyBuilder;
+    @Mock(type = MockType.STRICT)
+    private StateDirectory stateDirectory;
     @Mock(type = MockType.NICE)
     private ChangelogReader changeLogReader;
     @Mock(type = MockType.NICE)
+    private StreamsMetadataState streamsMetadataState;
+    @Mock(type = MockType.NICE)
     private Consumer<byte[], byte[]> restoreConsumer;
     @Mock(type = MockType.NICE)
     private Consumer<byte[], byte[]> consumer;
@@ -65,7 +82,7 @@ public class TaskManagerTest {
     @Mock(type = MockType.NICE)
     private StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
     @Mock(type = MockType.NICE)
-    private ThreadMetadataProvider threadMetadataProvider;
+    private StreamsKafkaClient streamsKafkaClient;
     @Mock(type = MockType.NICE)
     private StreamTask streamTask;
     @Mock(type = MockType.NICE)
@@ -77,17 +94,35 @@ public class TaskManagerTest {
 
     private TaskManager taskManager;
 
+    private final String topic1 = "topic1";
+    private final String topic2 = "topic2";
+    private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
+    private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
+    private final TopicPartition t1p3 = new TopicPartition(topic1, 3);
+    private final TopicPartition t2p1 = new TopicPartition(topic2, 1);
+    private final TopicPartition t2p2 = new TopicPartition(topic2, 2);
+    private final TopicPartition t2p3 = new TopicPartition(topic2, 3);
+
+    private final TaskId task01 = new TaskId(0, 1);
+    private final TaskId task02 = new TaskId(0, 2);
+    private final TaskId task03 = new TaskId(0, 3);
+    private final TaskId task11 = new TaskId(1, 1);
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder();
 
     @Before
     public void setUp() throws Exception {
         taskManager = new TaskManager(changeLogReader,
+                                      UUID.randomUUID(),
                                       "",
                                       restoreConsumer,
+                                      streamsMetadataState,
                                       activeTaskCreator,
                                       standbyTaskCreator,
+                                      streamsKafkaClient,
                                       active,
                                       standby);
-        taskManager.setThreadMetadataProvider(threadMetadataProvider);
         taskManager.setConsumer(consumer);
     }
 
@@ -97,18 +132,110 @@ public class TaskManagerTest {
                         consumer,
                         activeTaskCreator,
                         standbyTaskCreator,
-                        threadMetadataProvider,
                         active,
                         standby);
     }
 
     @Test
+    public void shouldUpdateSubscriptionFromAssignment() {
+        mockTopologyBuilder();
+        
expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1));
+        topologyBuilder.updateSubscribedTopics(EasyMock.eq(Utils.mkSet(topic1, 
topic2)), EasyMock.anyString());
+        expectLastCall().once();
+
+        EasyMock.replay(activeTaskCreator,
+                        topologyBuilder,
+                        subscriptionUpdates);
+
+        taskManager.updateSubscriptionsFromAssignment(Utils.mkList(t1p1, 
t2p1));
+
+        EasyMock.verify(activeTaskCreator,
+                        topologyBuilder,
+                        subscriptionUpdates);
+    }
+
+    @Test
+    public void shouldNotUpdateSubscriptionFromAssignment() {
+        mockTopologyBuilder();
+        expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1, 
topic2));
+
+        EasyMock.replay(activeTaskCreator,
+                        topologyBuilder,
+                        subscriptionUpdates);
+
+        taskManager.updateSubscriptionsFromAssignment(Utils.mkList(t1p1));
+
+        EasyMock.verify(activeTaskCreator,
+                        topologyBuilder,
+                        subscriptionUpdates);
+    }
+
+    @Test
+    public void shouldUpdateSubscriptionFromMetadata() {
+        mockTopologyBuilder();
+        
expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1));
+        topologyBuilder.updateSubscribedTopics(EasyMock.eq(Utils.mkSet(topic1, 
topic2)), EasyMock.anyString());
+        expectLastCall().once();
+
+        EasyMock.replay(activeTaskCreator,
+                topologyBuilder,
+                subscriptionUpdates);
+
+        taskManager.updateSubscriptionsFromMetadata(Utils.mkSet(topic1, 
topic2));
+
+        EasyMock.verify(activeTaskCreator,
+                topologyBuilder,
+                subscriptionUpdates);
+    }
+
+    @Test
+    public void shouldNotUpdateSubscriptionFromMetadata() {
+        mockTopologyBuilder();
+        
expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1));
+
+        EasyMock.replay(activeTaskCreator,
+                topologyBuilder,
+                subscriptionUpdates);
+
+        taskManager.updateSubscriptionsFromMetadata(Utils.mkSet(topic1));
+
+        EasyMock.verify(activeTaskCreator,
+                topologyBuilder,
+                subscriptionUpdates);
+    }
+
+    @Test
+    public void shouldReturnCachedTaskIdsFromDirectory() throws IOException {
+        File[] taskFolders = Utils.mkList(testFolder.newFolder("0_1"),
+                testFolder.newFolder("0_2"),
+                testFolder.newFolder("0_3"),
+                testFolder.newFolder("1_1"),
+                testFolder.newFolder("dummy")).toArray(new File[0]);
+
+        assertTrue((new File(taskFolders[0], 
ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile());
+        assertTrue((new File(taskFolders[1], 
ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile());
+        assertTrue((new File(taskFolders[3], 
ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile());
+
+        
expect(activeTaskCreator.stateDirectory()).andReturn(stateDirectory).once();
+        
expect(stateDirectory.listTaskDirectories()).andReturn(taskFolders).once();
+
+        EasyMock.replay(activeTaskCreator, stateDirectory);
+
+        Set<TaskId> tasks = taskManager.cachedTasksIds();
+
+        EasyMock.verify(activeTaskCreator, stateDirectory);
+
+        assertThat(tasks, equalTo(Utils.mkSet(task01, task02, task11)));
+    }
+
+    @Test
     public void 
shouldCloseActiveUnAssignedSuspendedTasksWhenCreatingNewTasks() {
         mockSingleActiveTask();
         active.closeNonAssignedSuspendedTasks(taskId0Assignment);
         expectLastCall();
         replay();
 
+        taskManager.setAssignmentMetadata(taskId0Assignment, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
         taskManager.createTasks(taskId0Partitions);
 
         verify(active);
@@ -121,6 +248,7 @@ public class TaskManagerTest {
         expectLastCall();
         replay();
 
+        taskManager.setAssignmentMetadata(taskId0Assignment, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
         taskManager.createTasks(taskId0Partitions);
 
         verify(active);
@@ -133,6 +261,7 @@ public class TaskManagerTest {
         EasyMock.expectLastCall();
         replay();
 
+        taskManager.setAssignmentMetadata(taskId0Assignment, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
         taskManager.createTasks(taskId0Partitions);
         verify(changeLogReader);
     }
@@ -144,6 +273,7 @@ public class TaskManagerTest {
         active.addNewTask(EasyMock.same(streamTask));
         replay();
 
+        taskManager.setAssignmentMetadata(taskId0Assignment, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
         taskManager.createTasks(taskId0Partitions);
 
         verify(activeTaskCreator, active);
@@ -152,10 +282,10 @@ public class TaskManagerTest {
     @Test
     public void shouldNotAddResumedActiveTasks() {
         checkOrder(active, true);
-        mockThreadMetadataProvider(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), taskId0Assignment);
         EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, 
taskId0Partitions)).andReturn(true);
         replay();
 
+        taskManager.setAssignmentMetadata(taskId0Assignment, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
         taskManager.createTasks(taskId0Partitions);
 
         // should be no calls to activeTaskCreator and no calls to 
active.addNewTasks(..)
@@ -169,6 +299,7 @@ public class TaskManagerTest {
         standby.addNewTask(EasyMock.same(standbyTask));
         replay();
 
+        taskManager.setAssignmentMetadata(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), taskId0Assignment);
         taskManager.createTasks(taskId0Partitions);
 
         verify(standbyTaskCreator, active);
@@ -177,10 +308,10 @@ public class TaskManagerTest {
     @Test
     public void shouldNotAddResumedStandbyTasks() {
         checkOrder(active, true);
-        mockThreadMetadataProvider(taskId0Assignment, Collections.<TaskId, 
Set<TopicPartition>>emptyMap());
         EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, 
taskId0Partitions)).andReturn(true);
         replay();
 
+        taskManager.setAssignmentMetadata(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), taskId0Assignment);
         taskManager.createTasks(taskId0Partitions);
 
         // should be no calls to standbyTaskCreator and no calls to 
standby.addNewTasks(..)
@@ -196,6 +327,7 @@ public class TaskManagerTest {
         EasyMock.expectLastCall();
         replay();
 
+        taskManager.setAssignmentMetadata(taskId0Assignment, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
         taskManager.createTasks(taskId0Partitions);
         verify(consumer);
     }
@@ -276,25 +408,6 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldCloseThreadMetadataProviderOnShutdown() {
-        threadMetadataProvider.close();
-        EasyMock.expectLastCall();
-        replay();
-
-        taskManager.shutdown(true);
-        verify(threadMetadataProvider);
-    }
-
-    @Test
-    public void shouldNotPropagateExceptionsOnShutdown() {
-        threadMetadataProvider.close();
-        EasyMock.expectLastCall().andThrow(new RuntimeException());
-        replay();
-
-        taskManager.shutdown(false);
-    }
-
-    @Test
     public void shouldInitializeNewActiveTasks() {
         EasyMock.expect(active.initializeNewTasks()).andReturn(new 
HashSet<TopicPartition>());
         
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
@@ -471,6 +584,26 @@ public class TaskManagerTest {
         EasyMock.verify(consumer);
     }
 
+    @Test
+    public void shouldUpdateTasksFromPartitionAssignment() {
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        taskManager.setAssignmentMetadata(activeTasks, standbyTasks);
+        assertTrue(taskManager.assignedActiveTasks().isEmpty());
+
+        // assign two active tasks with two partitions each
+        activeTasks.put(task01, new HashSet<>(Arrays.asList(t1p1, t2p1)));
+        activeTasks.put(task02, new HashSet<>(Arrays.asList(t1p2, t2p2)));
+
+        // assign one standby task with two partitions
+        standbyTasks.put(task03, new HashSet<>(Arrays.asList(t1p3, t2p3)));
+        taskManager.setAssignmentMetadata(activeTasks, standbyTasks);
+
+        assertThat(taskManager.assignedActiveTasks(), equalTo(activeTasks));
+        assertThat(taskManager.assignedStandbyTasks(), equalTo(standbyTasks));
+    }
+
     private void mockAssignStandbyPartitions(final long offset) {
         final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
         EasyMock.expect(active.initializeNewTasks()).andReturn(new 
HashSet<TopicPartition>());
@@ -486,31 +619,22 @@ public class TaskManagerTest {
     }
 
     private void mockStandbyTaskExpectations() {
-        mockThreadMetadataProvider(taskId0Assignment, Collections.<TaskId, 
Set<TopicPartition>>emptyMap());
         expect(standbyTaskCreator.createTasks(EasyMock.<Consumer<byte[], 
byte[]>>anyObject(),
                                                    
EasyMock.eq(taskId0Assignment)))
                 .andReturn(Collections.singletonList(standbyTask));
 
     }
 
-    @SuppressWarnings("unchecked")
     private void mockSingleActiveTask() {
-        mockThreadMetadataProvider(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), taskId0Assignment);
-
-        
expect(activeTaskCreator.createTasks(EasyMock.anyObject(Consumer.class),
+        expect(activeTaskCreator.createTasks(EasyMock.<Consumer<byte[], 
byte[]>>anyObject(),
                                                   
EasyMock.eq(taskId0Assignment)))
                 .andReturn(Collections.singletonList(streamTask));
 
     }
 
-    private void mockThreadMetadataProvider(final Map<TaskId, 
Set<TopicPartition>> standbyAssignment,
-                                            final Map<TaskId, 
Set<TopicPartition>> activeAssignment) {
-        expect(threadMetadataProvider.standbyTasks())
-                .andReturn(standbyAssignment)
-                .anyTimes();
-        expect(threadMetadataProvider.activeTasks())
-                .andReturn(activeAssignment)
-                .anyTimes();
+    private void mockTopologyBuilder() {
+        
expect(activeTaskCreator.builder()).andReturn(topologyBuilder).anyTimes();
+        
expect(topologyBuilder.sourceTopicPattern()).andReturn(Pattern.compile("abc"));
+        
expect(topologyBuilder.subscriptionUpdates()).andReturn(subscriptionUpdates);
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java 
b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
index 3908305..598ca8d 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
@@ -38,7 +38,7 @@ public class MockInternalTopicManager extends 
InternalTopicManager {
     private MockConsumer<byte[], byte[]> restoreConsumer;
 
     public MockInternalTopicManager(StreamsConfig streamsConfig, 
MockConsumer<byte[], byte[]> restoreConsumer) {
-        super(StreamsKafkaClient.create(streamsConfig), 0, 0, new MockTime());
+        super(StreamsKafkaClient.create(streamsConfig.originals()), 0, 0, new 
MockTime());
 
         this.restoreConsumer = restoreConsumer;
     }

Reply via email to