[ 
https://issues.apache.org/jira/browse/KAFKA-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701041#comment-16701041
 ] 

ASF GitHub Bot commented on KAFKA-7367:
---------------------------------------

mjsax closed pull request #5696: KAFKA-7367: Streams should not create state 
store directories unless they are needed
URL: https://github.com/apache/kafka/pull/5696
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bbda11de150..c29b7bc4244 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -644,12 +644,6 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
         final LogContext logContext = new 
LogContext(String.format("stream-client [%s] ", clientId));
         this.log = logContext.logger(getClass());
 
-        try {
-            stateDirectory = new StateDirectory(config, time);
-        } catch (final ProcessorStateException fatal) {
-            throw new StreamsException(fatal);
-        }
-
         final MetricConfig metricConfig = new MetricConfig()
             .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
             
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -664,7 +658,7 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
         internalTopologyBuilder.rewriteTopology(config);
 
         // sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
-        internalTopologyBuilder.build();
+        final ProcessorTopology taskTopology = internalTopologyBuilder.build();
 
         streamsMetadataState = new StreamsMetadataState(
                 internalTopologyBuilder,
@@ -680,6 +674,14 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
         }
         final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
         final long cacheSizePerThread = totalCacheSize / (threads.length + 
(globalTaskTopology == null ? 0 : 1));
+        final boolean createStateDirectory = 
taskTopology.hasPersistentLocalStore() ||
+                (globalTaskTopology != null && 
globalTaskTopology.hasPersistentGlobalStore());
+
+        try {
+            stateDirectory = new StateDirectory(config, time, 
createStateDirectory);
+        } catch (final ProcessorStateException fatal) {
+            throw new StreamsException(fatal);
+        }
 
         final StateRestoreListener delegatingStateRestoreListener = new 
DelegatingStateRestoreListener();
         GlobalStreamThread.State globalThreadState = null;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 8fcbbb44302..57af1f3f340 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -152,6 +152,24 @@ boolean isRepartitionTopic(final String topic) {
         return repartitionTopics.contains(topic);
     }
 
+    public boolean hasPersistentLocalStore() {
+        for (final StateStore store : stateStores) {
+            if (store.persistent()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean hasPersistentGlobalStore() {
+        for (final StateStore store : globalStateStores) {
+            if (store.persistent()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private String childrenToString(final String indent, final 
List<ProcessorNode<?, ?>> children) {
         if (children == null || children.isEmpty()) {
             return "";
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index a3227ca0c7d..f5c4c31d083 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -26,7 +26,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
@@ -50,6 +49,7 @@
     private static final Logger log = 
LoggerFactory.getLogger(StateDirectory.class);
 
     private final File stateDir;
+    private final boolean createStateDirectory;
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
     private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
     private final Time time;
@@ -71,19 +71,21 @@
      * Ensures that the state base directory as well as the application's 
sub-directory are created.
      *
      * @throws ProcessorStateException if the base state directory or 
application state directory does not exist
-     *                                 and could not be created
+     *                                 and could not be created when 
createStateDirectory is enabled.
      */
     public StateDirectory(final StreamsConfig config,
-                          final Time time) {
+                          final Time time,
+                          final boolean createStateDirectory) {
         this.time = time;
+        this.createStateDirectory = createStateDirectory;
         final String stateDirName = 
config.getString(StreamsConfig.STATE_DIR_CONFIG);
         final File baseDir = new File(stateDirName);
-        if (!baseDir.exists() && !baseDir.mkdirs()) {
+        if (this.createStateDirectory && !baseDir.exists() && 
!baseDir.mkdirs()) {
             throw new ProcessorStateException(
                 String.format("base state directory [%s] doesn't exist and 
couldn't be created", stateDirName));
         }
         stateDir = new File(baseDir, 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
-        if (!stateDir.exists() && !stateDir.mkdir()) {
+        if (this.createStateDirectory && !stateDir.exists() && 
!stateDir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("state directory [%s] doesn't exist and couldn't 
be created", stateDir.getPath()));
         }
@@ -96,7 +98,7 @@ public StateDirectory(final StreamsConfig config,
      */
     public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
-        if (!taskDir.exists() && !taskDir.mkdir()) {
+        if (createStateDirectory && !taskDir.exists() && !taskDir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("task directory [%s] doesn't exist and couldn't 
be created", taskDir.getPath()));
         }
@@ -110,7 +112,7 @@ public File directoryForTask(final TaskId taskId) {
      */
     File globalStateDir() {
         final File dir = new File(stateDir, "global");
-        if (!dir.exists() && !dir.mkdir()) {
+        if (createStateDirectory && !dir.exists() && !dir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("global state directory [%s] doesn't exist and 
couldn't be created", dir.getPath()));
         }
@@ -128,6 +130,9 @@ private String logPrefix() {
      * @throws IOException
      */
     synchronized boolean lock(final TaskId taskId) throws IOException {
+        if (!createStateDirectory) {
+            return true;
+        }
 
         final File lockFile;
         // we already have the lock so bail out here
@@ -169,6 +174,10 @@ synchronized boolean lock(final TaskId taskId) throws 
IOException {
     }
 
     synchronized boolean lockGlobalState() throws IOException {
+        if (!createStateDirectory) {
+            return true;
+        }
+
         if (globalStateLock != null) {
             log.trace("{} Found cached state dir lock for the global task", 
logPrefix());
             return true;
@@ -234,7 +243,9 @@ public synchronized void clean() {
             throw new StreamsException(e);
         }
         try {
-            Utils.delete(globalStateDir().getAbsoluteFile());
+            if (stateDir.exists()) {
+                Utils.delete(globalStateDir().getAbsoluteFile());
+            }
         } catch (final IOException e) {
             log.error("{} Failed to delete global state directory due to an 
unexpected exception", logPrefix(), e);
             throw new StreamsException(e);
@@ -320,12 +331,8 @@ private synchronized void cleanRemovedTasks(final long 
cleanupDelayMs,
      * @return The list of all the existing local directories for stream tasks
      */
     File[] listTaskDirectories() {
-        return stateDir.listFiles(new FileFilter() {
-            @Override
-            public boolean accept(final File pathname) {
-                return pathname.isDirectory() && 
PATH_NAME.matcher(pathname.getName()).matches();
-            }
-        });
+        return !stateDir.exists() ? new File[0] :
+                stateDir.listFiles(pathname -> pathname.isDirectory() && 
PATH_NAME.matcher(pathname.getName()).matches());
     }
 
     private FileChannel getOrCreateFileChannel(final TaskId taskId,
@@ -344,6 +351,4 @@ private FileLock tryLock(final FileChannel channel) throws 
IOException {
         }
     }
 
-
-
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index fde5bff9728..3ca0dcf9b29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Duration;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -27,27 +30,37 @@
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 
 import java.io.File;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -57,6 +70,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -79,6 +93,9 @@
     private KafkaStreams globalStreams;
     private Properties props;
 
+    @Rule
+    public TestName testName = new TestName();
+
     @Before
     public void before() {
         props = new Properties();
@@ -125,8 +142,8 @@ public void testStateChanges() throws InterruptedException {
         final StateListenerStub stateListener = new StateListenerStub();
         globalStreams.setStateListener(stateListener);
 
-        Assert.assertEquals(globalStreams.state(), KafkaStreams.State.CREATED);
-        Assert.assertEquals(stateListener.numChanges, 0);
+        Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state());
+        Assert.assertEquals(0, stateListener.numChanges);
 
         globalStreams.start();
         TestUtils.waitForCondition(
@@ -136,7 +153,7 @@ public void testStateChanges() throws InterruptedException {
 
         globalStreams.close();
 
-        Assert.assertEquals(globalStreams.state(), 
KafkaStreams.State.NOT_RUNNING);
+        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, 
globalStreams.state());
     }
 
     @Test
@@ -158,7 +175,7 @@ public void 
shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception
         builder.globalTable("anyTopic");
         final List<Node> nodes = asList(new Node(0, "localhost", 8121));
         final Cluster cluster = new Cluster("mockClusterId", nodes,
-                                            Collections.emptySet(), 
Collections.<String>emptySet(),
+                                            Collections.emptySet(), 
Collections.emptySet(),
                                             Collections.emptySet(), 
nodes.get(0));
         final MockClientSupplier clientSupplier = new MockClientSupplier();
         clientSupplier.setClusterForAdminClient(cluster);
@@ -349,6 +366,7 @@ public void testCannotStartTwice() {
 
         try {
             globalStreams.start();
+            fail("Should throw an IllegalStateException");
         } catch (final IllegalStateException e) {
             // this is ok
         } finally {
@@ -575,6 +593,141 @@ public void shouldNotBlockInCloseForZeroDuration() throws 
InterruptedException {
         }
     }
 
+    @Test
+    public void statelessTopologyShouldNotCreateStateDirectory() throws 
Exception {
+        final String inputTopic = testName.getMethodName() + "-input";
+        final String outputTopic = testName.getMethodName() + "-output";
+        CLUSTER.createTopics(inputTopic, outputTopic);
+
+        final Topology topology = new Topology();
+        topology.addSource("source", Serdes.String().deserializer(), 
Serdes.String().deserializer(), inputTopic)
+                .addProcessor("process", () -> new AbstractProcessor<String, 
String>() {
+                    @Override
+                    public void process(final String key, final String value) {
+                        if (value.length() % 2 == 0) {
+                            context().forward(key, key + value);
+                        }
+                    }
+                }, "source")
+                .addSink("sink", outputTopic, new StringSerializer(), new 
StringSerializer(), "process");
+        startStreamsAndCheckDirExists(topology, 
Collections.singleton(inputTopic), outputTopic, false);
+    }
+
+    @Test
+    public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws 
Exception {
+        final String inputTopic = testName.getMethodName() + "-input";
+        final String outputTopic = testName.getMethodName() + "-output";
+        final String globalTopicName = testName.getMethodName() + "-global";
+        final String storeName = testName.getMethodName() + "-counts";
+        final String globalStoreName = testName.getMethodName() + 
"-globalStore";
+        final Topology topology = getStatefulTopology(inputTopic, outputTopic, 
globalTopicName, storeName, globalStoreName, false);
+        startStreamsAndCheckDirExists(topology, asList(inputTopic, 
globalTopicName), outputTopic, false);
+    }
+
+    @Test
+    public void statefulTopologyShouldCreateStateDirectory() throws Exception {
+        final String inputTopic = testName.getMethodName() + "-input";
+        final String outputTopic = testName.getMethodName() + "-output";
+        final String globalTopicName = testName.getMethodName() + "-global";
+        final String storeName = testName.getMethodName() + "-counts";
+        final String globalStoreName = testName.getMethodName() + 
"-globalStore";
+        final Topology topology = getStatefulTopology(inputTopic, outputTopic, 
globalTopicName, storeName, globalStoreName, true);
+        startStreamsAndCheckDirExists(topology, asList(inputTopic, 
globalTopicName), outputTopic, true);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Topology getStatefulTopology(final String inputTopic,
+                                         final String outputTopic,
+                                         final String globalTopicName,
+                                         final String storeName,
+                                         final String globalStoreName,
+                                         final boolean isPersistentStore) 
throws Exception {
+        CLUSTER.createTopics(inputTopic, outputTopic, globalTopicName);
+        final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = 
Stores.keyValueStoreBuilder(isPersistentStore ?
+                Stores.persistentKeyValueStore(storeName) : 
Stores.inMemoryKeyValueStore(storeName), 
+                Serdes.String(), Serdes.Long());
+        final Topology topology = new Topology();
+        topology.addSource("source", Serdes.String().deserializer(), 
Serdes.String().deserializer(), inputTopic)
+                .addProcessor("process", () -> new AbstractProcessor<String, 
String>() {
+                    @Override
+                    public void process(final String key, final String value) {
+                        final KeyValueStore<String, Long> kvStore =
+                                (KeyValueStore<String, Long>) 
context().getStateStore(storeName);
+                        kvStore.put(key, 5L);
+
+                        context().forward(key, "5");
+                        context().commit();
+                    }
+                }, "source")
+                .addStateStore(storeBuilder, "process")
+                .addSink("sink", outputTopic, new StringSerializer(), new 
StringSerializer(), "process");
+
+        final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = 
Stores.keyValueStoreBuilder(
+                isPersistentStore ? 
Stores.persistentKeyValueStore(globalStoreName) : 
Stores.inMemoryKeyValueStore(globalStoreName), 
+                Serdes.String(), Serdes.String()).withLoggingDisabled();
+        topology.addGlobalStore(globalStoreBuilder,
+                "global",
+                Serdes.String().deserializer(),
+                Serdes.String().deserializer(),
+                globalTopicName,
+                globalTopicName + "-processor",
+                new MockProcessorSupplier());
+        return topology;
+    }
+
+    private void startStreamsAndCheckDirExists(final Topology topology,
+                                               final Collection<String> 
inputTopics,
+                                               final String outputTopic,
+                                               final boolean shouldFilesExist) 
throws Exception {
+        final File baseDir = new File(TestUtils.IO_TMP_DIR + File.separator + 
"kafka-" + TestUtils.randomString(5));
+        final Path basePath = baseDir.toPath();
+        if (!baseDir.exists()) {
+            Files.createDirectory(basePath);
+        }
+        // changing the path of state directory to make sure that it should 
not clash with other test cases.
+        final Properties localProps = new Properties();
+        localProps.putAll(props);
+        localProps.put(StreamsConfig.STATE_DIR_CONFIG, 
baseDir.getAbsolutePath());
+
+        final KafkaStreams streams = new KafkaStreams(topology, localProps);
+        streams.start();
+
+        for (final String topic : inputTopics) {
+            
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
+                    Collections.singletonList(new KeyValue<>("A", "A")),
+                    TestUtils.producerConfig(
+                            CLUSTER.bootstrapServers(),
+                            StringSerializer.class,
+                            StringSerializer.class,
+                            new Properties()),
+                    System.currentTimeMillis());
+        }
+
+        IntegrationTestUtils.readKeyValues(outputTopic,
+                TestUtils.consumerConfig(
+                        CLUSTER.bootstrapServers(),
+                        outputTopic + "-group",
+                        StringDeserializer.class,
+                        StringDeserializer.class),
+                5000, 1);
+
+        try {
+            final List<Path> files = Files.find(basePath, 999, (p, bfa) -> 
!p.equals(basePath)).collect(Collectors.toList());
+            if (shouldFilesExist && files.isEmpty()) {
+                Assert.fail("Files should have existed, but it didn't: " + 
files);
+            }
+            if (!shouldFilesExist && !files.isEmpty()) {
+                Assert.fail("Files should not have existed, but it did: " + 
files);
+            }
+        } catch (final IOException e) {
+            Assert.fail("Couldn't read the state directory : " + 
baseDir.getPath());
+        } finally {
+            streams.close();
+            streams.cleanUp();
+            Utils.delete(baseDir);
+        }
+    }
+
     private void verifyCleanupStateDir(final String appDir, final File 
oldTaskDir) throws InterruptedException {
         final File taskDir = new File(appDir, "0_0");
         TestUtils.waitForCondition(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 81938f82d1a..c3c45db7de9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -132,7 +132,7 @@ public void shouldRestoreStateFromSourceTopic() throws 
Exception {
         createStateForRestoration(INPUT_STREAM);
         setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
 
-        final StateDirectory stateDirectory = new StateDirectory(new 
StreamsConfig(props), new MockTime());
+        final StateDirectory stateDirectory = new StateDirectory(new 
StreamsConfig(props), new MockTime(), true);
         new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new 
TaskId(0, 0)), ".checkpoint"))
                 .write(Collections.singletonMap(new 
TopicPartition(INPUT_STREAM, 0), (long) offsetCheckpointed));
         new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new 
TaskId(0, 1)), ".checkpoint"))
@@ -199,7 +199,7 @@ public void shouldRestoreStateFromChangelogTopic() throws 
Exception {
         createStateForRestoration(APPID + "-store-changelog");
         createStateForRestoration(INPUT_STREAM);
 
-        final StateDirectory stateDirectory = new StateDirectory(new 
StreamsConfig(props), new MockTime());
+        final StateDirectory stateDirectory = new StateDirectory(new 
StreamsConfig(props), new MockTime(), true);
         new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new 
TaskId(0, 0)), ".checkpoint"))
                 .write(Collections.singletonMap(new TopicPartition(APPID + 
"-store-changelog", 0), (long) offsetCheckpointed));
         new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new 
TaskId(0, 1)), ".checkpoint"))
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 4ed44be47f2..b3afa24a325 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -152,7 +152,7 @@ public void 
shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws IOExcep
         expect(store4.name()).andReturn(storeName4).anyTimes();
         EasyMock.replay(store4);
 
-        final StateDirectory stateDirectory = new 
StateDirectory(streamsConfig, new MockTime());
+        final StateDirectory stateDirectory = new 
StateDirectory(streamsConfig, new MockTime(), true);
         final AbstractTask task = createTask(
             consumer,
             new HashMap<StateStore, String>() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 8789e2ba29a..8b02b3e66d2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -112,7 +112,7 @@ public void before() throws IOException {
                 put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
             }
         });
-        stateDirectory = new StateDirectory(streamsConfig, time);
+        stateDirectory = new StateDirectory(streamsConfig, time, true);
         consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
         stateManager = new GlobalStateManagerImpl(
             new LogContext("test"),
@@ -139,7 +139,7 @@ public void shouldLockGlobalStateDirectory() {
 
     @Test(expected = LockException.class)
     public void shouldThrowLockExceptionIfCantGetLock() throws IOException {
-        final StateDirectory stateDir = new StateDirectory(streamsConfig, 
time);
+        final StateDirectory stateDir = new StateDirectory(streamsConfig, 
time, true);
         try {
             stateDir.lockGlobalState();
             stateManager.initialize();
@@ -357,8 +357,8 @@ public void 
shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
     @Test
     public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
         stateManager.initialize();
-        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
-        final StateDirectory stateDir = new StateDirectory(streamsConfig, new 
MockTime());
+        stateManager.close(Collections.emptyMap());
+        final StateDirectory stateDir = new StateDirectory(streamsConfig, new 
MockTime(), true);
         try {
             // should be able to get the lock now as it should've been 
released in close
             assertTrue(stateDir.lockGlobalState());
@@ -419,7 +419,7 @@ public void 
shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws IOExcept
         } catch (final StreamsException e) {
             // expected
         }
-        final StateDirectory stateDir = new StateDirectory(streamsConfig, new 
MockTime());
+        final StateDirectory stateDir = new StateDirectory(streamsConfig, new 
MockTime(), true);
         try {
             // should be able to get the lock now as it should've been released
             assertTrue(stateDir.lockGlobalState());
@@ -511,7 +511,7 @@ public void 
shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir(
             new LogContext("mock"),
             topology,
             consumer,
-            new StateDirectory(streamsConfig, time) {
+            new StateDirectory(streamsConfig, time, true) {
                 @Override
                 public boolean lockGlobalState() throws IOException {
                     throw new IOException("KABOOM!");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 37a6fdba453..387179b56b8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -107,7 +107,7 @@ public String newStoreName(final String prefix) {
         globalStreamThread = new 
GlobalStreamThread(builder.rewriteTopology(config).buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
-                                                    new StateDirectory(config, 
time),
+                                                    new StateDirectory(config, 
time, true),
                                                     0,
                                                     new Metrics(),
                                                     new MockTime(),
@@ -140,7 +140,7 @@ public void 
shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() {
         globalStreamThread = new 
GlobalStreamThread(builder.buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
-                                                    new StateDirectory(config, 
time),
+                                                    new StateDirectory(config, 
time, true),
                                                     0,
                                                     new Metrics(),
                                                     new MockTime(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index fbcb2c85b47..3d91ee08136 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -98,7 +98,7 @@ public void setup() {
                 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
                 put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getPath());
             }
-        }), new MockTime());
+        }), new MockTime(), true);
         checkpointFile = new File(stateDirectory.directoryForTask(taskId), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
         checkpoint = new OffsetCheckpoint(checkpointFile);
     }
@@ -565,7 +565,7 @@ public void close() {
     }
 
     @Test
-    public void shouldFlushAllStoresEvenIfStoreThrowsExcepiton() throws 
IOException {
+    public void shouldFlushAllStoresEvenIfStoreThrowsException() throws 
IOException {
         final ProcessorStateManager stateManager = new ProcessorStateManager(
             taskId,
             Collections.singleton(changelogTopicPartition),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 587cae20179..11050fe6f55 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -38,7 +38,9 @@
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -54,8 +56,10 @@
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class ProcessorTopologyTest {
 
@@ -367,6 +371,58 @@ public void shouldAddHeaders() {
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, 30L);
     }
 
+    @Test
+    public void statelessTopologyShouldNotHavePersistentStore() {
+        final TopologyWrapper topology = new TopologyWrapper();
+        final ProcessorTopology processorTopology = 
topology.getInternalBuilder("anyAppId").build();
+        assertFalse(processorTopology.hasPersistentLocalStore());
+        assertFalse(processorTopology.hasPersistentGlobalStore());
+    }
+
+    @Test
+    public void inMemoryStoreShouldNotResultInPersistentLocalStore() {
+        final ProcessorTopology processorTopology = 
createLocalStoreTopology(Stores.inMemoryKeyValueStore("my-store"));
+        assertFalse(processorTopology.hasPersistentLocalStore());
+    }
+
+    @Test
+    public void persistentLocalStoreShouldBeDetected() {
+        final ProcessorTopology processorTopology = 
createLocalStoreTopology(Stores.persistentKeyValueStore("my-store"));
+        assertTrue(processorTopology.hasPersistentLocalStore());
+    }
+
+    @Test
+    public void inMemoryStoreShouldNotResultInPersistentGlobalStore() {
+        final ProcessorTopology processorTopology = 
createGlobalStoreTopology(Stores.inMemoryKeyValueStore("my-store"));
+        assertFalse(processorTopology.hasPersistentGlobalStore());
+    }
+
+    @Test
+    public void persistentGlobalStoreShouldBeDetected() {
+        final ProcessorTopology processorTopology = 
createGlobalStoreTopology(Stores.persistentKeyValueStore("my-store"));
+        assertTrue(processorTopology.hasPersistentGlobalStore());
+    }
+
+    private ProcessorTopology createLocalStoreTopology(final 
KeyValueBytesStoreSupplier storeSupplier) {
+        final TopologyWrapper topology = new TopologyWrapper();
+        final String processor = "processor";
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+                Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), 
Serdes.String());
+        topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
"topic")
+                .addProcessor(processor, () -> new 
StatefulProcessor(storeSupplier.name()), "source")
+                .addStateStore(storeBuilder, processor);
+        return topology.getInternalBuilder("anyAppId").build();
+    }
+
+    private ProcessorTopology createGlobalStoreTopology(final 
KeyValueBytesStoreSupplier storeSupplier) {
+        final TopologyWrapper topology = new TopologyWrapper();
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
+                Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), 
Serdes.String()).withLoggingDisabled();
+        topology.addGlobalStore(storeBuilder, "global", STRING_DESERIALIZER, 
STRING_DESERIALIZER, "topic", "processor",
+                define(new StatefulProcessor(storeSupplier.name())));
+        return topology.getInternalBuilder("anyAppId").build();
+    }
+
     private void assertNextOutputRecord(final String topic,
                                         final String key,
                                         final String value) {
@@ -416,12 +472,7 @@ private void assertNoOutputRecord(final String topic) {
     }
 
     private StreamPartitioner<Object, Object> constantPartitioner(final 
Integer partition) {
-        return new StreamPartitioner<Object, Object>() {
-            @Override
-            public Integer partition(final String topic, final Object key, 
final Object value, final int numPartitions) {
-                return partition;
-            }
-        };
+        return (topic, key, value, numPartitions) -> partition;
     }
 
     private Topology createSimpleTopology(final int partition) {
@@ -619,12 +670,7 @@ public void close() {
     }
 
     private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> 
processor) {
-        return new ProcessorSupplier<K, V>() {
-            @Override
-            public Processor<K, V> get() {
-                return processor;
-            }
-        };
+        return () -> processor;
     }
 
     /**
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 0c24e2d637f..17b146f6fd0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -160,7 +160,7 @@ public void setup() throws Exception {
             new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new 
Node[0], new Node[0])
         ));
         baseDir = TestUtils.tempDirectory();
-        stateDirectory = new StateDirectory(createConfig(baseDir), new 
MockTime());
+        stateDirectory = new StateDirectory(createConfig(baseDir), new 
MockTime(), true);
     }
 
     @After
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index e14d01077ac..edb02c6224f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -52,9 +52,8 @@
     private StateDirectory directory;
     private File appDir;
 
-    @Before
-    public void before() {
-        stateDir = new File(TestUtils.IO_TMP_DIR, TestUtils.randomString(5));
+    private void initializeStateDirectory(final boolean createStateDirectory) {
+        stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + 
TestUtils.randomString(5));
         directory = new StateDirectory(
             new StreamsConfig(new Properties() {
                 {
@@ -63,10 +62,15 @@ public void before() {
                     put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
                 }
             }),
-            time);
+            time, createStateDirectory);
         appDir = new File(stateDir, applicationId);
     }
 
+    @Before
+    public void before() {
+        initializeStateDirectory(true);
+    }
+
     @After
     public void cleanup() throws IOException {
         Utils.delete(stateDir);
@@ -138,7 +142,7 @@ public void shouldNotLockDeletedDirectory() throws 
IOException {
     }
     
     @Test
-    public void shouldLockMulitpleTaskDirectories() throws IOException {
+    public void shouldLockMultipleTaskDirectories() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
         final File task1Dir = directory.directoryForTask(taskId);
         final TaskId taskId2 = new TaskId(1, 0);
@@ -254,7 +258,7 @@ public void shouldCreateDirectoriesIfParentDoesntExist() {
                     put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
                 }
             }),
-            time);
+            time, true);
         final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0));
         assertTrue(stateDir.exists());
         assertTrue(taskDir.exists());
@@ -296,14 +300,11 @@ public void shouldUnlockGlobalStateDirectory() throws 
IOException {
     public void shouldNotLockStateDirLockedByAnotherThread() throws 
IOException, InterruptedException {
         final TaskId taskId = new TaskId(0, 0);
         final AtomicReference<IOException> exceptionOnThread = new 
AtomicReference<>();
-        final Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    directory.lock(taskId);
-                } catch (final IOException e) {
-                    exceptionOnThread.set(e);
-                }
+        final Thread thread = new Thread(() -> {
+            try {
+                directory.lock(taskId);
+            } catch (final IOException e) {
+                exceptionOnThread.set(e);
             }
         });
         thread.start();
@@ -318,17 +319,14 @@ public void 
shouldNotUnLockStateDirLockedByAnotherThread() throws IOException, I
         final CountDownLatch lockLatch = new CountDownLatch(1);
         final CountDownLatch unlockLatch = new CountDownLatch(1);
         final AtomicReference<Exception> exceptionOnThread = new 
AtomicReference<>();
-        final Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    directory.lock(taskId);
-                    lockLatch.countDown();
-                    unlockLatch.await();
-                    directory.unlock(taskId);
-                } catch (final Exception e) {
-                    exceptionOnThread.set(e);
-                }
+        final Thread thread = new Thread(() -> {
+            try {
+                directory.lock(taskId);
+                lockLatch.countDown();
+                unlockLatch.await();
+                directory.unlock(taskId);
+            } catch (final Exception e) {
+                exceptionOnThread.set(e);
             }
         });
         thread.start();
@@ -358,4 +356,39 @@ public void 
shouldCleanupAllTaskDirectoriesIncludingGlobalOne() {
         files = Arrays.asList(appDir.listFiles());
         assertEquals(0, files.size());
     }
+
+    @Test
+    public void shouldNotCreateBaseDirectory() {
+        initializeStateDirectory(false);
+        assertFalse(stateDir.exists());
+        assertFalse(appDir.exists());
+    }
+
+    @Test
+    public void shouldNotCreateTaskStateDirectory() {
+        initializeStateDirectory(false);
+        final TaskId taskId = new TaskId(0, 0);
+        final File taskDirectory = directory.directoryForTask(taskId);
+        assertFalse(taskDirectory.exists());
+    }
+
+    @Test
+    public void shouldNotCreateGlobalStateDirectory() {
+        initializeStateDirectory(false);
+        final File globalStateDir = directory.globalStateDir();
+        assertFalse(globalStateDir.exists());
+    }
+
+    @Test
+    public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() 
throws IOException {
+        initializeStateDirectory(false);
+        final TaskId taskId = new TaskId(0, 0);
+        assertTrue(directory.lock(taskId));
+    }
+
+    @Test
+    public void shouldLockGlobalStateDirectoryWhenDirectoryCreationDisabled() 
throws IOException {
+        initializeStateDirectory(false);
+        assertTrue(directory.lockGlobalState());
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index b042e3c6a54..d4b8511ee91 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -167,7 +167,7 @@ private StreamsConfig createConfig(final boolean enableEoS) 
{
     @Before
     public void setup() {
         consumer.assign(asList(partition1, partition2));
-        stateDirectory = new StateDirectory(createConfig(false), new 
MockTime());
+        stateDirectory = new StateDirectory(createConfig(false), new 
MockTime(), true);
     }
 
     @After
@@ -968,7 +968,7 @@ public void 
shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
     }
 
     @Test
-    public void 
shouldWrapProducerFencedExceptionWithTaskMigragedExceptionForBeginTransaction() 
{
+    public void 
shouldWrapProducerFencedExceptionWithTaskMigratedExceptionForBeginTransaction() 
{
         task = createStatelessTask(createConfig(true));
         producer.fenceProducer();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 11474a38663..42e22f53c99 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -112,7 +112,7 @@
     private final InternalStreamsBuilder internalStreamsBuilder = new 
InternalStreamsBuilder(new InternalTopologyBuilder());
     private final StreamsConfig config = new StreamsConfig(configProps(false));
     private final String stateDir = TestUtils.tempDirectory().getPath();
-    private final StateDirectory stateDirectory = new StateDirectory(config, 
mockTime);
+    private final StateDirectory stateDirectory = new StateDirectory(config, 
mockTime, true);
     private final ConsumedInternal<Object, Object> consumed = new 
ConsumedInternal<>();
 
     private UUID processId = UUID.randomUUID();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index ca059b4c60b..9679429b66a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -99,7 +99,7 @@ public void before() {
         final ProcessorTopology processorTopology = 
topology.getInternalBuilder(applicationId).build();
 
         tasks = new HashMap<>();
-        stateDirectory = new StateDirectory(streamsConfig, new MockTime());
+        stateDirectory = new StateDirectory(streamsConfig, new MockTime(), 
true);
 
         taskOne = createStreamsTask(streamsConfig, clientSupplier, 
processorTopology, new TaskId(0, 0));
         taskOne.initializeStateStores();
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index a11ae6b8df0..a90afe7918a 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -246,6 +246,8 @@ private TopologyTestDriver(final InternalTopologyBuilder 
builder,
 
         processorTopology = internalTopologyBuilder.build(null);
         globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        final boolean createStateDirectory = 
processorTopology.hasPersistentLocalStore() ||
+                (globalTopology != null && 
globalTopology.hasPersistentGlobalStore());
 
         final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
         producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, 
bytesSerializer) {
@@ -256,7 +258,7 @@ private TopologyTestDriver(final InternalTopologyBuilder 
builder,
         };
 
         final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime);
+        stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime, 
createStateDirectory);
 
         final MetricConfig metricConfig = new MetricConfig()
             
.samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 3e95c731d9e..9f1c3a7a369 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import java.io.File;
 import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -41,6 +42,8 @@
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -72,6 +75,7 @@
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -773,14 +777,18 @@ public void shouldReturnAllStoresNames() {
     }
 
     private void setup() {
+        setup(Stores.inMemoryKeyValueStore("aggStore"));
+    }
+
+    private void setup(final KeyValueBytesStoreSupplier storeSupplier) {
         final Topology topology = new Topology();
         topology.addSource("sourceProcessor", "input-topic");
         topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), 
"sourceProcessor");
         topology.addStateStore(Stores.keyValueStoreBuilder(
-            Stores.inMemoryKeyValueStore("aggStore"),
-            Serdes.String(),
-            Serdes.Long()),
-            "aggregator");
+                storeSupplier,
+                Serdes.String(),
+                Serdes.Long()),
+                "aggregator");
         topology.addSink("sinkProcessor", "result-topic", "aggregator");
 
         config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
@@ -1070,4 +1078,25 @@ public void 
shouldThrowPatternNotValidForTopicNameException() {
             assertEquals(str, exception.getMessage());
         }
     }
+
+    @Test
+    public void shouldNotCreateStateDirectoryForStatelessTopology() {
+        setup();
+        final String stateDir = 
config.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+        final File appDir = new File(stateDir, 
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG));
+        assertFalse(appDir.exists());
+    }
+
+    @Test
+    public void shouldCreateStateDirectoryForStatefulTopology() {
+        setup(Stores.persistentKeyValueStore("aggStore"));
+        final String stateDir = 
config.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+        final File appDir = new File(stateDir, 
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG));
+
+        assertTrue(appDir.exists());
+        assertTrue(appDir.isDirectory());
+
+        final TaskId taskId = new TaskId(0, 0);
+        assertTrue(new File(appDir, taskId.toString()).exists());
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams should not create state store directories unless they are needed
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-7367
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7367
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: Kamal Chandraprakash
>            Priority: Major
>              Labels: newbie
>
> Streams currently unconditionally creates state store directories, even if 
> the topology is stateless.
> This can be a problem running Streams in an environment without access to 
> disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to