This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b9dd203  KAFKA-8319: Make KafkaStreamsTest a non-integration test 
class (#7382) (#8352)
b9dd203 is described below

commit b9dd203f9ab0a18e77a9c29b82b2ab80fcfd03d9
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Wed Mar 25 23:03:44 2020 +0100

    KAFKA-8319: Make KafkaStreamsTest a non-integration test class (#7382) 
(#8352)
    
    Previous KafkaStreamsTest takes 2min20s on my local laptop, because lots of 
its integration test which is producing / consuming records, and checking state 
directory file system takes lots of time. On the other hand, these tests should 
be well simplified with mocks.
    
    This test reduces the test from a clumsy integration test class into a unit 
tests with mocks of its internal modules. And some other test functions should 
not be in KafkaStreamsTest actually and have been moved to other modular test 
classes. Now it takes 2s.
    
    Also it helps removing the potential flakiness of the following (some of 
them are claimed resolved only because we have not seen them recently, but 
after looking at the test code I can verify they are still flaky):
    
    * KAFKA-5818 (the original JIRA ticket indeed exposed a real issue that has 
been fixed, but the test itself remains flaky)
    * KAFKA-6215
    * KAFKA-7921
    * KAFKA-7990
    * KAFKA-8319
    * KAFKA-8427
    
    Reviewers: Bill Bejeck <b...@confluent.io>, John Roesler 
<j...@confluent.io>, Bruno Cadonna <br...@confluent.io>
    
    This commit was cherry-picked from trunk and adapted.
    
    Co-authored-by: Guozhang Wang <wangg...@gmail.com>
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 build.gradle                                       |   2 +
 .../kafka/clients/consumer/MockConsumer.java       |   1 -
 .../org/apache/kafka/streams/KafkaStreams.java     |  64 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 882 +++++++++++++--------
 .../apache/kafka/streams/StreamsConfigTest.java    |  26 +
 .../processor/internals/StreamThreadTest.java      |  15 +-
 .../org/apache/kafka/test/MockClientSupplier.java  |   2 +-
 7 files changed, 615 insertions(+), 377 deletions(-)

diff --git a/build.gradle b/build.gradle
index dbd6ac3..867e512 100644
--- a/build.gradle
+++ b/build.gradle
@@ -976,6 +976,8 @@ project(':streams') {
     testCompile libs.log4j
     testCompile libs.junit
     testCompile libs.easymock
+    testCompile libs.powermockJunit4
+    testCompile libs.powermockEasymock
     testCompile libs.bcpkix
     testCompile libs.hamcrest
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index be95627..1c90ba1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -434,7 +434,6 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
     @Override
     public synchronized void close(long timeout, TimeUnit unit) {
-        ensureNotClosed();
         this.closed = true;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index db825b1..ce113d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -144,7 +144,7 @@ public class KafkaStreams {
     private final QueryableStoreProvider queryableStoreProvider;
     private final AdminClient adminClient;
 
-    private GlobalStreamThread globalStreamThread;
+    GlobalStreamThread globalStreamThread;
     private KafkaStreams.StateListener stateListener;
     private StateRestoreListener globalStateRestoreListener;
 
@@ -424,7 +424,7 @@ public class KafkaStreams {
         private void maybeSetRunning() {
             // one thread is running, check others, including global thread
             for (final StreamThread.State state : threadState.values()) {
-                if (state != StreamThread.State.RUNNING) {
+                if (state != StreamThread.State.RUNNING && state != 
StreamThread.State.DEAD) {
                     return;
                 }
             }
@@ -852,45 +852,43 @@ public class KafkaStreams {
             // wait for all threads to join in a separate thread;
             // save the current thread so that if it is a stream thread
             // we don't attempt to join it and cause a deadlock
-            final Thread shutdownThread = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    // notify all the threads to stop; avoid deadlocks by 
stopping any
-                    // further state reports from the thread since we're 
shutting down
-                    for (final StreamThread thread : threads) {
-                        thread.setStateListener(null);
-                        thread.shutdown();
-                    }
+            final Thread shutdownThread = new Thread(() -> {
 
-                    for (final StreamThread thread : threads) {
-                        try {
-                            if (!thread.isRunning()) {
-                                thread.join();
-                            }
-                        } catch (final InterruptedException ex) {
-                            Thread.currentThread().interrupt();
+                // notify all the threads to stop; avoid deadlocks by stopping 
any
+                // further state reports from the thread since we're shutting 
down
+                for (final StreamThread thread : threads) {
+                    thread.setStateListener(null);
+                    thread.shutdown();
+                }
+
+                for (final StreamThread thread : threads) {
+                    try {
+                        if (!thread.isRunning()) {
+                            thread.join();
                         }
+                    } catch (final InterruptedException ex) {
+                        Thread.currentThread().interrupt();
                     }
+                }
 
-                    if (globalStreamThread != null) {
-                        globalStreamThread.setStateListener(null);
-                        globalStreamThread.shutdown();
-                    }
+                if (globalStreamThread != null) {
+                    globalStreamThread.setStateListener(null);
+                    globalStreamThread.shutdown();
+                }
 
-                    if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
-                        try {
-                            globalStreamThread.join();
-                        } catch (final InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                        }
-                        globalStreamThread = null;
+                if (globalStreamThread != null && 
!globalStreamThread.stillRunning()) {
+                    try {
+                        globalStreamThread.join();
+                    } catch (final InterruptedException e) {
+                        Thread.currentThread().interrupt();
                     }
+                    globalStreamThread = null;
+                }
 
-                    adminClient.close();
+                adminClient.close();
 
-                    metrics.close();
-                    setState(State.NOT_RUNNING);
-                }
+                metrics.close();
+                setState(State.NOT_RUNNING);
             }, "kafka-streams-close-thread");
 
             shutdownThread.setDaemon(true);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index b9d542b..c83a9bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,164 +16,438 @@
  */
 package org.apache.kafka.streams;
 
-import java.time.Duration;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockMetricsReporter;
-import org.apache.kafka.test.MockStateRestoreListener;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
-import java.io.File;
-import java.util.Arrays;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static java.util.Collections.singletonList;
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-@Category({IntegrationTest.class})
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KafkaStreams.class, StreamThread.class})
 public class KafkaStreamsTest {
 
-    private static final int NUM_BROKERS = 1;
     private static final int NUM_THREADS = 2;
-    // We need this to avoid the KafkaConsumer hanging on poll
-    // (this may occur if the test doesn't complete quickly enough)
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
-    private final StreamsBuilder builder = new StreamsBuilder();
-    private KafkaStreams globalStreams;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private MockClientSupplier supplier;
+    private MockTime time;
+
     private Properties props;
 
+    @Mock
+    private StateDirectory stateDirectory;
+    @Mock
+    private StreamThread streamThreadOne;
+    @Mock
+    private StreamThread streamThreadTwo;
+    @Mock
+    private GlobalStreamThread globalStreamThread;
+    @Mock
+    private ScheduledExecutorService cleanupSchedule;
+    @Mock
+    private Metrics metrics;
+
+    private StateListenerStub streamsStateListener;
+    private Capture<List<MetricsReporter>> metricsReportersCapture;
+    private Capture<StreamThread.StateListener> threadStatelistenerCapture;
+
+    public static class StateListenerStub implements 
KafkaStreams.StateListener {
+        int numChanges = 0;
+        KafkaStreams.State oldState;
+        KafkaStreams.State newState;
+        public Map<KafkaStreams.State, Long> mapStates = new HashMap<>();
+
+        @Override
+        public void onChange(final KafkaStreams.State newState,
+                             final KafkaStreams.State oldState) {
+            final long prevCount = mapStates.containsKey(newState) ? 
mapStates.get(newState) : 0;
+            numChanges++;
+            this.oldState = oldState;
+            this.newState = newState;
+            mapStates.put(newState, prevCount + 1);
+        }
+    }
+
     @Before
-    public void before() {
+    public void before() throws Exception {
+        time = new MockTime();
+        supplier = new MockClientSupplier();
+        supplier.setClusterForAdminClient(Cluster.bootstrap(singletonList(new 
InetSocketAddress("localhost", 9999))));
+        streamsStateListener = new StateListenerStub();
+        threadStatelistenerCapture = EasyMock.newCapture();
+        metricsReportersCapture = EasyMock.newCapture();
+
         props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        props.put(StreamsConfig.CLIENT_ID_CONFIG, "clientId");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2018");
         props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
         props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
-        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-        globalStreams = new KafkaStreams(builder.build(), props);
-    }
 
-    @After
-    public void cleanup() {
-        if (globalStreams != null) {
-            globalStreams.close();
-        }
+        prepareStreams();
     }
 
-    @Test
-    public void testOsDefaultSocketBufferSizes() {
-        props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, 
Selectable.USE_DEFAULT_BUFFER_SIZE);
-        props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, 
Selectable.USE_DEFAULT_BUFFER_SIZE);
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        streams.close();
+    private void prepareStreams() throws Exception {
+        // setup metrics
+        PowerMock.expectNew(Metrics.class,
+            anyObject(MetricConfig.class),
+            EasyMock.capture(metricsReportersCapture),
+            EasyMock.anyObject(Time.class)
+        ).andAnswer(() -> {
+            for (final MetricsReporter reporter : 
metricsReportersCapture.getValue()) {
+                reporter.init(Collections.emptyList());
+            }
+            return metrics;
+        }).anyTimes();
+        metrics.close();
+        EasyMock.expectLastCall().andAnswer(() -> {
+            for (final MetricsReporter reporter : 
metricsReportersCapture.getValue()) {
+                reporter.close();
+            }
+            return null;
+        }).anyTimes();
+
+        // setup stream threads
+        PowerMock.mockStatic(StreamThread.class);
+        EasyMock.expect(StreamThread.create(
+            anyObject(InternalTopologyBuilder.class),
+            anyObject(StreamsConfig.class),
+            anyObject(KafkaClientSupplier.class),
+            anyObject(AdminClient.class),
+            anyObject(UUID.class),
+            anyObject(String.class),
+            anyObject(Metrics.class),
+            anyObject(Time.class),
+            anyObject(StreamsMetadataState.class),
+            anyLong(),
+            anyObject(StateDirectory.class),
+            anyObject(StateRestoreListener.class)
+        )).andReturn(streamThreadOne).andReturn(streamThreadTwo);
+
+        EasyMock.expect(streamThreadOne.getId()).andReturn(0L).anyTimes();
+        EasyMock.expect(streamThreadTwo.getId()).andReturn(1L).anyTimes();
+        prepareStreamThread(streamThreadOne, true);
+        prepareStreamThread(streamThreadTwo, false);
+
+        // setup global threads
+        final AtomicReference<GlobalStreamThread.State> globalThreadState = 
new AtomicReference<>(GlobalStreamThread.State.CREATED);
+        PowerMock.expectNew(GlobalStreamThread.class,
+            anyObject(ProcessorTopology.class),
+            anyObject(StreamsConfig.class),
+            anyObject(Consumer.class),
+            anyObject(StateDirectory.class),
+            anyLong(),
+            anyObject(Metrics.class),
+            anyObject(Time.class),
+            anyString(),
+            anyObject(StateRestoreListener.class)
+        ).andReturn(globalStreamThread).anyTimes();
+        
EasyMock.expect(globalStreamThread.state()).andAnswer(globalThreadState::get).anyTimes();
+        
globalStreamThread.setStateListener(EasyMock.capture(threadStatelistenerCapture));
+        EasyMock.expectLastCall().anyTimes();
+
+        globalStreamThread.start();
+        EasyMock.expectLastCall().andAnswer(() -> {
+            globalThreadState.set(GlobalStreamThread.State.RUNNING);
+            threadStatelistenerCapture.getValue().onChange(globalStreamThread,
+                GlobalStreamThread.State.RUNNING,
+                GlobalStreamThread.State.CREATED);
+            return null;
+        }).anyTimes();
+        globalStreamThread.shutdown();
+        EasyMock.expectLastCall().andAnswer(() -> {
+            supplier.restoreConsumer.close();
+            for (final MockProducer producer : supplier.producers) {
+                producer.close();
+            }
+            globalThreadState.set(GlobalStreamThread.State.DEAD);
+            if (threadStatelistenerCapture.getValue() != null) {
+                
threadStatelistenerCapture.getValue().onChange(globalStreamThread,
+                    GlobalStreamThread.State.PENDING_SHUTDOWN,
+                    GlobalStreamThread.State.RUNNING
+                );
+                
threadStatelistenerCapture.getValue().onChange(globalStreamThread,
+                    GlobalStreamThread.State.DEAD,
+                    GlobalStreamThread.State.PENDING_SHUTDOWN
+                );
+            }
+            return null;
+        }).anyTimes();
+        
EasyMock.expect(globalStreamThread.stillRunning()).andReturn(globalThreadState.get()
 == GlobalStreamThread.State.RUNNING).anyTimes();
+        globalStreamThread.join();
+        EasyMock.expectLastCall().anyTimes();
+
+        PowerMock.replay(StreamThread.class, Metrics.class, metrics, 
streamThreadOne, streamThreadTwo, GlobalStreamThread.class, globalStreamThread);
+    }
+
+    private void prepareStreamThread(final StreamThread thread, final boolean 
terminable) throws Exception {
+        final AtomicReference<StreamThread.State> state = new 
AtomicReference<>(StreamThread.State.CREATED);
+        EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes();
+
+        thread.setStateListener(EasyMock.capture(threadStatelistenerCapture));
+        EasyMock.expectLastCall().anyTimes();
+
+        thread.start();
+        EasyMock.expectLastCall().andAnswer(() -> {
+            state.set(StreamThread.State.CREATED);
+            threadStatelistenerCapture.getValue().onChange(thread,
+                StreamThread.State.RUNNING,
+                StreamThread.State.CREATED);
+            threadStatelistenerCapture.getValue().onChange(thread,
+                StreamThread.State.PARTITIONS_REVOKED,
+                StreamThread.State.RUNNING);
+            threadStatelistenerCapture.getValue().onChange(thread,
+                StreamThread.State.PARTITIONS_ASSIGNED,
+                StreamThread.State.PARTITIONS_REVOKED);
+            threadStatelistenerCapture.getValue().onChange(thread,
+                StreamThread.State.RUNNING,
+                StreamThread.State.PARTITIONS_ASSIGNED);
+            return null;
+        }).anyTimes();
+        thread.shutdown();
+        EasyMock.expectLastCall().andAnswer(() -> {
+            supplier.consumer.close();
+            supplier.restoreConsumer.close();
+            for (final MockProducer producer : supplier.producers) {
+                producer.close();
+            }
+            state.set(StreamThread.State.DEAD);
+            if (threadStatelistenerCapture.getValue() != null) {
+                threadStatelistenerCapture.getValue()
+                    .onChange(thread, StreamThread.State.PENDING_SHUTDOWN, 
StreamThread.State.RUNNING);
+                threadStatelistenerCapture.getValue()
+                    .onChange(thread, StreamThread.State.DEAD, 
StreamThread.State.PENDING_SHUTDOWN);
+            }
+            return null;
+        }).anyTimes();
+        EasyMock.expect(thread.isRunning()).andReturn(state.get() == 
StreamThread.State.RUNNING).anyTimes();
+        thread.join();
+        if (terminable)
+            EasyMock.expectLastCall().anyTimes();
+        else
+            EasyMock.expectLastCall().andAnswer(() -> {
+                Thread.sleep(50L);
+                return null;
+            }).anyTimes();
     }
 
-    @Test(expected = KafkaException.class)
-    public void testInvalidSocketSendBufferSize() {
-        props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, -2);
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+    @Test
+    public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
         streams.close();
-    }
 
-    @Test(expected = KafkaException.class)
-    public void testInvalidSocketReceiveBufferSize() {
-        props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, -2);
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        streams.close();
+        assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
     }
 
     @Test
-    public void testStateChanges() throws InterruptedException {
-        final StateListenerStub stateListener = new StateListenerStub();
-        globalStreams.setStateListener(stateListener);
+    public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() 
throws InterruptedException {
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.setStateListener(streamsStateListener);
+
+        assertEquals(0, streamsStateListener.numChanges);
+        assertEquals(KafkaStreams.State.CREATED, streams.state());
 
-        Assert.assertEquals(globalStreams.state(), KafkaStreams.State.CREATED);
-        Assert.assertEquals(stateListener.numChanges, 0);
+        streams.start();
 
-        globalStreams.start();
         TestUtils.waitForCondition(
-            () -> globalStreams.state() == KafkaStreams.State.RUNNING,
-            10 * 1000,
+            () -> streamsStateListener.numChanges == 5,
             "Streams never started.");
+        assertEquals(KafkaStreams.State.RUNNING, streams.state());
 
-        globalStreams.close();
+        for (final StreamThread thread: streams.threads) {
+            threadStatelistenerCapture.getValue().onChange(
+                thread,
+                StreamThread.State.PARTITIONS_REVOKED,
+                StreamThread.State.RUNNING);
+        }
+
+        assertEquals(6, streamsStateListener.numChanges);
+        assertEquals(KafkaStreams.State.REBALANCING, streams.state());
+
+        for (final StreamThread thread : streams.threads) {
+            threadStatelistenerCapture.getValue().onChange(
+                thread,
+                StreamThread.State.PARTITIONS_ASSIGNED,
+                StreamThread.State.PARTITIONS_REVOKED);
+        }
+
+        assertEquals(6, streamsStateListener.numChanges);
+        assertEquals(KafkaStreams.State.REBALANCING, streams.state());
+
+        threadStatelistenerCapture.getValue().onChange(
+            streams.threads[NUM_THREADS - 1],
+            StreamThread.State.PENDING_SHUTDOWN,
+            StreamThread.State.PARTITIONS_ASSIGNED);
 
-        Assert.assertEquals(globalStreams.state(), 
KafkaStreams.State.NOT_RUNNING);
+        threadStatelistenerCapture.getValue().onChange(
+            streams.threads[NUM_THREADS - 1],
+            StreamThread.State.DEAD,
+            StreamThread.State.PENDING_SHUTDOWN);
+
+        assertEquals(6, streamsStateListener.numChanges);
+        assertEquals(KafkaStreams.State.REBALANCING, streams.state());
+
+        for (final StreamThread thread : streams.threads) {
+            if (thread != streams.threads[NUM_THREADS - 1]) {
+                threadStatelistenerCapture.getValue().onChange(
+                    thread,
+                    StreamThread.State.RUNNING,
+                    StreamThread.State.PARTITIONS_ASSIGNED);
+            }
+        }
+
+        assertEquals(7, streamsStateListener.numChanges);
+        assertEquals(KafkaStreams.State.RUNNING, streams.state());
+        streams.close();
+
+        TestUtils.waitForCondition(
+            () -> streamsStateListener.numChanges == 9,
+            "Streams never closed.");
+        assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
     }
 
     @Test
-    public void testStateCloseAfterCreate() {
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+    public void stateShouldTransitToErrorIfAllThreadsDead() throws 
InterruptedException {
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.setStateListener(streamsStateListener);
 
-        try {
-            final StateListenerStub stateListener = new StateListenerStub();
-            streams.setStateListener(stateListener);
-        } finally {
-            streams.close();
+        assertEquals(0, streamsStateListener.numChanges);
+        assertEquals(KafkaStreams.State.CREATED, streams.state());
+
+        streams.start();
+
+        TestUtils.waitForCondition(
+            () -> streamsStateListener.numChanges == 5,
+            "Streams never started.");
+        assertEquals(KafkaStreams.State.RUNNING, streams.state());
+
+        for (final StreamThread thread : streams.threads) {
+            threadStatelistenerCapture.getValue().onChange(
+                thread,
+                StreamThread.State.PARTITIONS_REVOKED,
+                StreamThread.State.RUNNING);
+        }
+
+        assertEquals(6, streamsStateListener.numChanges);
+        assertEquals(KafkaStreams.State.REBALANCING, streams.state());
+
+        threadStatelistenerCapture.getValue().onChange(
+            streams.threads[NUM_THREADS - 1],
+            StreamThread.State.PENDING_SHUTDOWN,
+            StreamThread.State.PARTITIONS_REVOKED);
+
+        threadStatelistenerCapture.getValue().onChange(
+            streams.threads[NUM_THREADS - 1],
+            StreamThread.State.DEAD,
+            StreamThread.State.PENDING_SHUTDOWN);
+
+        assertEquals(6, streamsStateListener.numChanges);
+        assertEquals(KafkaStreams.State.REBALANCING, streams.state());
+
+        for (final StreamThread thread : streams.threads) {
+            if (thread != streams.threads[NUM_THREADS - 1]) {
+                threadStatelistenerCapture.getValue().onChange(
+                    thread,
+                    StreamThread.State.PENDING_SHUTDOWN,
+                    StreamThread.State.PARTITIONS_REVOKED);
+
+                threadStatelistenerCapture.getValue().onChange(
+                    thread,
+                    StreamThread.State.DEAD,
+                    StreamThread.State.PENDING_SHUTDOWN);
+            }
         }
 
-        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
+        assertEquals(7, streamsStateListener.numChanges);
+        assertEquals(KafkaStreams.State.ERROR, streams.state());
+
+        streams.close();
+
+        // the state should not stuck with ERROR, but transit to NOT_RUNNING 
in the end
+        TestUtils.waitForCondition(
+            () -> streamsStateListener.numChanges == 9,
+            "Streams never closed.");
+        assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
     }
 
     @Test
     public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws 
Exception {
+        final StreamsBuilder builder = new StreamsBuilder();
         builder.globalTable("anyTopic");
-        final List<Node> nodes = Arrays.asList(new Node(0, "localhost", 8121));
-        final Cluster cluster = new Cluster("mockClusterId", nodes,
-                                            Collections.emptySet(), 
Collections.<String>emptySet(),
-                                            Collections.emptySet(), 
nodes.get(0));
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
-        clientSupplier.setClusterForAdminClient(cluster);
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
clientSupplier);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
supplier, time);
         streams.close();
+
         TestUtils.waitForCondition(
             () -> streams.state() == KafkaStreams.State.NOT_RUNNING,
             10 * 1000,
             "Streams never stopped.");
 
-        // Ensure that any created clients are closed
-        assertTrue(clientSupplier.consumer.closed());
-        assertTrue(clientSupplier.restoreConsumer.closed());
-        for (final MockProducer p : clientSupplier.producers) {
+        assertTrue(supplier.consumer.closed());
+        assertTrue(supplier.restoreConsumer.closed());
+        for (final MockProducer p : supplier.producers) {
             assertTrue(p.closed());
         }
     }
@@ -181,15 +455,12 @@ public class KafkaStreamsTest {
     @Test
     public void testStateThreadClose() throws Exception {
         // make sure we have the global state thread running too
+        final StreamsBuilder builder = new StreamsBuilder();
         builder.globalTable("anyTopic");
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
supplier, time);
 
         try {
-            final java.lang.reflect.Field threadsField = 
streams.getClass().getDeclaredField("threads");
-            threadsField.setAccessible(true);
-            final StreamThread[] threads = (StreamThread[]) 
threadsField.get(streams);
-
-            assertEquals(NUM_THREADS, threads.length);
+            assertEquals(NUM_THREADS, streams.threads.length);
             assertEquals(streams.state(), KafkaStreams.State.CREATED);
 
             streams.start();
@@ -199,13 +470,11 @@ public class KafkaStreamsTest {
                 "Streams never started.");
 
             for (int i = 0; i < NUM_THREADS; i++) {
-                final StreamThread tmpThread = threads[i];
+                final StreamThread tmpThread = streams.threads[i];
                 tmpThread.shutdown();
-                TestUtils.waitForCondition(
-                    () -> tmpThread.state() == StreamThread.State.DEAD,
-                    10 * 1000,
+                TestUtils.waitForCondition(() -> tmpThread.state() == 
StreamThread.State.DEAD,
                     "Thread never stopped.");
-                threads[i].join();
+                streams.threads[i].join();
             }
             TestUtils.waitForCondition(
                 () -> streams.state() == KafkaStreams.State.ERROR,
@@ -220,17 +489,15 @@ public class KafkaStreamsTest {
             10 * 1000,
             "Streams never stopped.");
 
-        final java.lang.reflect.Field globalThreadField = 
streams.getClass().getDeclaredField("globalStreamThread");
-        globalThreadField.setAccessible(true);
-        final GlobalStreamThread globalStreamThread = (GlobalStreamThread) 
globalThreadField.get(streams);
-        assertNull(globalStreamThread);
+        assertNull(streams.globalStreamThread);
     }
 
     @Test
     public void testStateGlobalThreadClose() throws Exception {
         // make sure we have the global state thread running too
+        final StreamsBuilder builder = new StreamsBuilder();
         builder.globalTable("anyTopic");
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
supplier, time);
 
         try {
             streams.start();
@@ -238,9 +505,8 @@ public class KafkaStreamsTest {
                 () -> streams.state() == KafkaStreams.State.RUNNING,
                 10 * 1000,
                 "Streams never started.");
-            final java.lang.reflect.Field globalThreadField = 
streams.getClass().getDeclaredField("globalStreamThread");
-            globalThreadField.setAccessible(true);
-            final GlobalStreamThread globalStreamThread = (GlobalStreamThread) 
globalThreadField.get(streams);
+
+            final GlobalStreamThread globalStreamThread = 
streams.globalStreamThread;
             globalStreamThread.shutdown();
             TestUtils.waitForCondition(
                 () -> globalStreamThread.state() == 
GlobalStreamThread.State.DEAD,
@@ -256,55 +522,9 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void 
globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
-        final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
-        props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
-        props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
-
-        props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 200);
-
-        // make sure we have the global state thread running too
-        builder.globalTable("anyTopic");
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        try {
-            streams.start();
-            fail("expected start() to time out and throw an exception.");
-        } catch (final StreamsException expected) {
-            // This is a result of not being able to connect to the broker.
-        } finally {
-            streams.close();
-        }
-        // There's nothing to assert... We're testing that this operation 
actually completes.
-    }
-
-    @Test
-    public void testLocalThreadCloseWithoutConnectingToBroker() {
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:1");
-        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
-        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
-
-        // make sure we have the global state thread running too
-        builder.table("anyTopic");
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        try {
-            streams.start();
-        } finally {
-            streams.close();
-        }
-        // There's nothing to assert... We're testing that this operation 
actually completes.
-    }
-
-
-    @Test
     public void testInitializesAndDestroysMetricsReporters() {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
 
         try {
             final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
@@ -322,59 +542,50 @@ public class KafkaStreamsTest {
 
     @Test
     public void testCloseIsIdempotent() {
-        globalStreams.close();
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.close();
         final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-        globalStreams.close();
+        streams.close();
         Assert.assertEquals("subsequent close() calls should do nothing",
             closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
     @Test
     public void testCannotStartOnceClosed() {
-        globalStreams.start();
-        globalStreams.close();
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.start();
+        streams.close();
         try {
-            globalStreams.start();
+            streams.start();
             fail("Should have throw IllegalStateException");
         } catch (final IllegalStateException expected) {
             // this is ok
         } finally {
-            globalStreams.close();
-        }
-    }
-
-    @Test
-    public void testCannotStartTwice() {
-        globalStreams.start();
-
-        try {
-            globalStreams.start();
-        } catch (final IllegalStateException e) {
-            // this is ok
-        } finally {
-            globalStreams.close();
+            streams.close();
         }
     }
 
     @Test
     public void shouldNotSetGlobalRestoreListenerAfterStarting() {
-        globalStreams.start();
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.start();
         try {
-            globalStreams.setGlobalStateRestoreListener(new 
MockStateRestoreListener());
+            streams.setGlobalStateRestoreListener(null);
             fail("Should throw an IllegalStateException");
         } catch (final IllegalStateException e) {
             // expected
         } finally {
-            globalStreams.close();
+            streams.close();
         }
     }
 
     @Test
     public void 
shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
-        globalStreams.start();
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.start();
         try {
-            globalStreams.setUncaughtExceptionHandler(null);
+            streams.setUncaughtExceptionHandler(null);
             fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
             // expected
@@ -383,9 +594,10 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
-        globalStreams.start();
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.start();
         try {
-            globalStreams.setStateListener(null);
+            streams.setStateListener(null);
             fail("Should throw IllegalStateException");
         } catch (final IllegalStateException e) {
             // expected
@@ -393,212 +605,208 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testIllegalMetricsConfig() {
-        props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, 
"illegalConfig");
-
+    public void shouldAllowCleanupBeforeStartAndAfterClose() {
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
         try {
-            new KafkaStreams(builder.build(), props);
-            fail("Should have throw ConfigException");
-        } catch (final ConfigException expected) { /* expected */ }
+            streams.cleanUp();
+            streams.start();
+        } finally {
+            streams.close();
+            streams.cleanUp();
+        }
     }
 
     @Test
-    public void testLegalMetricsConfig() {
-        props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, 
Sensor.RecordingLevel.INFO.toString());
-        new KafkaStreams(builder.build(), props).close();
+    public void shouldThrowOnCleanupWhileRunning() throws InterruptedException 
{
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.start();
+        TestUtils.waitForCondition(
+            () -> streams.state() == KafkaStreams.State.RUNNING,
+            "Streams never started.");
 
-        props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, 
Sensor.RecordingLevel.DEBUG.toString());
-        new KafkaStreams(builder.build(), props).close();
+        try {
+            streams.cleanUp();
+            fail("Should have thrown IllegalStateException");
+        } catch (final IllegalStateException expected) {
+            assertEquals("Cannot clean up while running.", 
expected.getMessage());
+        }
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetAllTasksWhenNotRunning() {
-        globalStreams.allMetadata();
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.allMetadata();
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
-        globalStreams.allMetadataForStore("store");
+    public void shouldNotGetAllTasksWithStoreWhenNotRunningOrRebalancing() {
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.allMetadataForStore("store");
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
-        globalStreams.metadataForKey("store", "key", 
Serdes.String().serializer());
+    public void 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunningOrRebalancing() {
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.metadataForKey("store", "key", Serdes.String().serializer());
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
-        globalStreams.metadataForKey("store", "key", (topic, key, value, 
numPartitions) -> 0);
-    }
-
-    @Test
-    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws 
Exception {
-        final AtomicBoolean keepRunning = new AtomicBoolean(true);
-        KafkaStreams streams = null;
-        try {
-            final StreamsBuilder builder = new StreamsBuilder();
-            final CountDownLatch latch = new CountDownLatch(1);
-            final String topic = "input";
-            CLUSTER.createTopics(topic);
-
-            builder.stream(topic, Consumed.with(Serdes.String(), 
Serdes.String()))
-                    .foreach((key, value) -> {
-                        try {
-                            latch.countDown();
-                            while (keepRunning.get()) {
-                                Thread.sleep(10);
-                            }
-                        } catch (final InterruptedException e) {
-                            // no-op
-                        }
-                    });
-            streams = new KafkaStreams(builder.build(), props);
-            streams.start();
-            
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
-                Collections.singletonList(new KeyValue<>("A", "A")),
-                TestUtils.producerConfig(
-                    CLUSTER.bootstrapServers(),
-                    StringSerializer.class,
-                    StringSerializer.class,
-                    new Properties()),
-                System.currentTimeMillis());
-
-            assertTrue("Timed out waiting to receive single message", 
latch.await(30, TimeUnit.SECONDS));
-            assertFalse(streams.close(Duration.ofMillis(10)));
-        } finally {
-            // stop the thread so we don't interfere with other tests etc
-            keepRunning.set(false);
-            if (streams != null) {
-                streams.close();
-            }
-        }
+    public void 
shouldNotGetTaskWithKeyAndPartitionerWhenNotRunningOrRebalancing() {
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+        streams.metadataForKey("store", "key", (topic, key, value, 
numPartitions) -> 0);
     }
 
     @Test
-    public void shouldReturnThreadMetadata() {
-        globalStreams.start();
-        final Set<ThreadMetadata> threadMetadata = 
globalStreams.localThreadsMetadata();
-        assertNotNull(threadMetadata);
-        assertEquals(2, threadMetadata.size());
-        for (final ThreadMetadata metadata : threadMetadata) {
-            assertTrue("#threadState() was: " + metadata.threadState() + "; 
expected either RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED",
-                Utils.mkList("RUNNING", "PARTITIONS_REVOKED", 
"PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState()));
-            assertEquals(0, metadata.standbyTasks().size());
-            assertEquals(0, metadata.activeTasks().size());
-        }
+    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() {
+        // do not use mock time so that it can really elapse
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier);
+        assertFalse(streams.close(Duration.ofMillis(10L)));
     }
 
-    @Test
-    public void shouldAllowCleanupBeforeStartAndAfterClose() {
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowOnNegativeTimeoutForClose() {
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
         try {
-            globalStreams.cleanUp();
-            globalStreams.start();
+            streams.close(Duration.ofMillis(-1L));
         } finally {
-            globalStreams.close();
+            streams.close();
         }
-        globalStreams.cleanUp();
     }
 
     @Test
-    public void shouldThrowOnCleanupWhileRunning() throws InterruptedException 
{
-        globalStreams.start();
-        TestUtils.waitForCondition(
-            () -> globalStreams.state() == KafkaStreams.State.RUNNING,
-            10 * 1000,
-            "Streams never started.");
-
+    public void shouldNotBlockInCloseForZeroDuration() {
+        final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
         try {
-            globalStreams.cleanUp();
-            fail("Should have thrown IllegalStateException");
-        } catch (final IllegalStateException expected) {
-            assertEquals("Cannot clean up while running.", 
expected.getMessage());
+            // with mock time that does not elapse, close would not return if 
it ever waits on the state transition
+            assertFalse(streams.close(Duration.ZERO));
+        } finally {
+            streams.close();
         }
     }
 
     @Test
-    public void shouldCleanupOldStateDirs() throws InterruptedException {
-        props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
+    public void shouldCleanupOldStateDirs() throws Exception {
+        PowerMock.mockStatic(Executors.class);
+        final ScheduledExecutorService cleanupSchedule = 
EasyMock.mock(ScheduledExecutorService.class);
+        EasyMock.expect(Executors.newSingleThreadScheduledExecutor(
+            anyObject(ThreadFactory.class)
+        )).andReturn(cleanupSchedule).anyTimes();
+        EasyMock.expect(cleanupSchedule.scheduleAtFixedRate(
+            EasyMock.anyObject(Runnable.class),
+            EasyMock.eq(1L),
+            EasyMock.eq(1L),
+            EasyMock.eq(TimeUnit.MILLISECONDS)
+        )).andReturn(null);
+        EasyMock.expect(cleanupSchedule.shutdownNow()).andReturn(null);
+        PowerMock.expectNew(StateDirectory.class,
+            anyObject(StreamsConfig.class),
+            anyObject(Time.class)
+        ).andReturn(stateDirectory);
+        PowerMock.replayAll(Executors.class, cleanupSchedule, stateDirectory);
 
-        final String topic = "topic";
-        CLUSTER.createTopic(topic);
+        props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
         final StreamsBuilder builder = new StreamsBuilder();
+        builder.table("topic", Materialized.as("store"));
 
-        final Consumed<String, String> consumed = 
Consumed.with(Serdes.String(), Serdes.String());
-        builder.table(topic, consumed);
-
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        try {
-            final CountDownLatch latch = new CountDownLatch(1);
-            streams.setStateListener((newState, oldState) -> {
-                if (newState == KafkaStreams.State.RUNNING && oldState == 
KafkaStreams.State.REBALANCING) {
-                    latch.countDown();
-                }
-            });
-            final String appDir = 
props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
-            final File oldTaskDir = new File(appDir, "10_1");
-            assertTrue(oldTaskDir.mkdirs());
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
supplier, time);
+        streams.start();
+        streams.close();
 
-            streams.start();
-            latch.await(30, TimeUnit.SECONDS);
-            verifyCleanupStateDir(appDir, oldTaskDir);
-            assertTrue(oldTaskDir.mkdirs());
-            verifyCleanupStateDir(appDir, oldTaskDir);
-        } finally {
-            streams.close();
-        }
+        PowerMock.verify(Executors.class, cleanupSchedule);
     }
 
     @Test
-    public void shouldThrowOnNegativeTimeoutForClose() {
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        try {
-            streams.close(Duration.ofMillis(-1L));
-            fail("should not accept negative close parameter");
-        } catch (final IllegalArgumentException e) {
-            // expected
-        } finally {
-            streams.close();
-        }
+    public void statelessTopologyShouldNotCreateStateDirectory() throws 
Exception {
+        final String inputTopic = testName.getMethodName() + "-input";
+        final String outputTopic = testName.getMethodName() + "-output";
+        final Topology topology = new Topology();
+        topology.addSource("source", Serdes.String().deserializer(), 
Serdes.String().deserializer(), inputTopic)
+                .addProcessor("process", () -> new AbstractProcessor<String, 
String>() {
+                    @Override
+                    public void process(final String key, final String value) {
+                        if (value.length() % 2 == 0) {
+                            context().forward(key, key + value);
+                        }
+                    }
+                }, "source")
+                .addSink("sink", outputTopic, new StringSerializer(), new 
StringSerializer(), "process");
+        startStreamsAndCheckDirExists(topology);
     }
 
     @Test
-    public void shouldNotBlockInCloseForZeroDuration() throws 
InterruptedException {
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        final Thread th = new Thread(() -> 
streams.close(Duration.ofMillis(0L)));
+    public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws 
Exception {
+        final String inputTopic = testName.getMethodName() + "-input";
+        final String outputTopic = testName.getMethodName() + "-output";
+        final String globalTopicName = testName.getMethodName() + "-global";
+        final String storeName = testName.getMethodName() + "-counts";
+        final String globalStoreName = testName.getMethodName() + 
"-globalStore";
+        final Topology topology = getStatefulTopology(inputTopic, outputTopic, 
globalTopicName, storeName, globalStoreName, false);
+        startStreamsAndCheckDirExists(topology);
+    }
 
-        th.start();
+    @Test
+    public void statefulTopologyShouldCreateStateDirectory() throws Exception {
+        final String inputTopic = testName.getMethodName() + "-input";
+        final String outputTopic = testName.getMethodName() + "-output";
+        final String globalTopicName = testName.getMethodName() + "-global";
+        final String storeName = testName.getMethodName() + "-counts";
+        final String globalStoreName = testName.getMethodName() + 
"-globalStore";
+        final Topology topology = getStatefulTopology(inputTopic, outputTopic, 
globalTopicName, storeName, globalStoreName, true);
+        startStreamsAndCheckDirExists(topology);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Topology getStatefulTopology(final String inputTopic,
+                                         final String outputTopic,
+                                         final String globalTopicName,
+                                         final String storeName,
+                                         final String globalStoreName,
+                                         final boolean isPersistentStore) {
+        final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = 
Stores.keyValueStoreBuilder(
+            isPersistentStore ?
+                Stores.persistentKeyValueStore(storeName)
+                : Stores.inMemoryKeyValueStore(storeName),
+            Serdes.String(),
+            Serdes.Long());
+        final Topology topology = new Topology();
+        topology.addSource("source", Serdes.String().deserializer(), 
Serdes.String().deserializer(), inputTopic)
+            .addProcessor("process", () -> new AbstractProcessor<String, 
String>() {
+                @Override
+                public void process(final String key, final String value) {
+                    final KeyValueStore<String, Long> kvStore =
+                        (KeyValueStore<String, Long>) 
context().getStateStore(storeName);
+                    kvStore.put(key, 5L);
+
+                    context().forward(key, "5");
+                    context().commit();
+                }
+            }, "source")
+            .addStateStore(storeBuilder, "process")
+            .addSink("sink", outputTopic, new StringSerializer(), new 
StringSerializer(), "process");
 
-        try {
-            th.join(30_000L);
-            assertFalse(th.isAlive());
-        } finally {
-            streams.close();
-        }
+        final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = 
Stores.keyValueStoreBuilder(
+            isPersistentStore ? 
Stores.persistentKeyValueStore(globalStoreName) : 
Stores.inMemoryKeyValueStore(globalStoreName),
+            Serdes.String(), Serdes.String()).withLoggingDisabled();
+        topology.addGlobalStore(globalStoreBuilder,
+            "global",
+            Serdes.String().deserializer(),
+            Serdes.String().deserializer(),
+            globalTopicName,
+            globalTopicName + "-processor",
+            new MockProcessorSupplier());
+        return topology;
     }
 
-    private void verifyCleanupStateDir(final String appDir, final File 
oldTaskDir) throws InterruptedException {
-        final File taskDir = new File(appDir, "0_0");
-        TestUtils.waitForCondition(
-            () -> !oldTaskDir.exists() && taskDir.exists(),
-            30000,
-            "cleanup has not successfully run");
-        assertTrue(taskDir.exists());
-    }
+    private void startStreamsAndCheckDirExists(final Topology topology) throws 
Exception {
+        PowerMock.expectNew(StateDirectory.class,
+            anyObject(StreamsConfig.class),
+            anyObject(Time.class)
+        ).andReturn(stateDirectory);
 
-    public static class StateListenerStub implements 
KafkaStreams.StateListener {
-        int numChanges = 0;
-        KafkaStreams.State oldState;
-        KafkaStreams.State newState;
-        public Map<KafkaStreams.State, Long> mapStates = new HashMap<>();
+        PowerMock.replayAll();
 
-        @Override
-        public void onChange(final KafkaStreams.State newState, final 
KafkaStreams.State oldState) {
-            final long prevCount = mapStates.containsKey(newState) ? 
mapStates.get(newState) : 0;
-            numChanges++;
-            this.oldState = oldState;
-            this.newState = newState;
-            mapStates.put(newState, prevCount + 1);
-        }
-    }
+        new KafkaStreams(topology, props, supplier, time);
 
+        PowerMock.verifyAll();
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 138f901..9ab9cf5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -75,6 +76,31 @@ public class StreamsConfigTest {
     }
 
     @Test(expected = ConfigException.class)
+    public void testIllegalMetricsRecordingLevel() {
+        props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, 
"illegalConfig");
+        new StreamsConfig(props);
+    }
+
+    @Test
+    public void testOsDefaultSocketBufferSizes() {
+        props.put(StreamsConfig.SEND_BUFFER_CONFIG, 
CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND);
+        props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, 
CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND);
+        new StreamsConfig(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidSocketSendBufferSize() {
+        props.put(StreamsConfig.SEND_BUFFER_CONFIG, -2);
+        new StreamsConfig(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidSocketReceiveBufferSize() {
+        props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, -2);
+        new StreamsConfig(props);
+    }
+
+    @Test(expected = ConfigException.class)
     public void shouldThrowExceptionIfApplicationIdIsNotSet() {
         props.remove(StreamsConfig.APPLICATION_ID_CONFIG);
         new StreamsConfig(props);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2bdb353..8afaa44 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -228,7 +228,7 @@ public class StreamThreadTest {
     }
 
     private Cluster createCluster() {
-        final Node node = new Node(0, "localhost", 8121);
+        final Node node = new Node(-1, "localhost", 8121);
         return new Cluster(
             "mockClusterId",
             singletonList(node),
@@ -1014,10 +1014,15 @@ public class StreamThreadTest {
 
         thread.runOnce();
 
-        final ThreadMetadata threadMetadata = thread.threadMetadata();
-        assertEquals(StreamThread.State.RUNNING.name(), 
threadMetadata.threadState());
-        assertTrue(threadMetadata.activeTasks().contains(new 
TaskMetadata(task1.toString(), Utils.mkSet(t1p1))));
-        assertTrue(threadMetadata.standbyTasks().isEmpty());
+        final ThreadMetadata metadata = thread.threadMetadata();
+        assertEquals(StreamThread.State.RUNNING.name(), 
metadata.threadState());
+        assertTrue(metadata.activeTasks().contains(new 
TaskMetadata(task1.toString(), Utils.mkSet(t1p1))));
+        assertTrue(metadata.standbyTasks().isEmpty());
+
+        assertTrue("#threadState() was: " + metadata.threadState() + "; 
expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or 
CREATED",
+            Arrays.asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", 
"PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState()));
+        final String threadName = metadata.threadName();
+        assertTrue(threadName.startsWith("clientId-StreamThread-"));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java 
b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index d3430f2..c712e80 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -58,7 +58,7 @@ public class MockClientSupplier implements 
KafkaClientSupplier {
 
     @Override
     public AdminClient getAdminClient(final Map<String, Object> config) {
-        return new MockAdminClient(cluster.nodes(), cluster.nodeById(0));
+        return new MockAdminClient(cluster.nodes(), cluster.nodeById(-1));
     }
 
     @Override

Reply via email to