[ https://issues.apache.org/jira/browse/KAFKA-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701041#comment-16701041 ]
ASF GitHub Bot commented on KAFKA-7367: --------------------------------------- mjsax closed pull request #5696: KAFKA-7367: Streams should not create state store directories unless they are needed URL: https://github.com/apache/kafka/pull/5696 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index bbda11de150..c29b7bc4244 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -644,12 +644,6 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId)); this.log = logContext.logger(getClass()); - try { - stateDirectory = new StateDirectory(config, time); - } catch (final ProcessorStateException fatal) { - throw new StreamsException(fatal); - } - final MetricConfig metricConfig = new MetricConfig() .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) @@ -664,7 +658,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, internalTopologyBuilder.rewriteTopology(config); // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception - internalTopologyBuilder.build(); + final ProcessorTopology taskTopology = internalTopologyBuilder.build(); streamsMetadataState = new StreamsMetadataState( internalTopologyBuilder, @@ -680,6 +674,14 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, } final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1)); + final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() || + (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore()); + + try { + stateDirectory = new StateDirectory(config, time, createStateDirectory); + } catch (final ProcessorStateException fatal) { + throw new StreamsException(fatal); + } final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener(); GlobalStreamThread.State globalThreadState = null; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 8fcbbb44302..57af1f3f340 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -152,6 +152,24 @@ boolean isRepartitionTopic(final String topic) { return repartitionTopics.contains(topic); } + public boolean hasPersistentLocalStore() { + for (final StateStore store : stateStores) { + if (store.persistent()) { + return true; + } + } + return false; + } + + public boolean hasPersistentGlobalStore() { + for (final StateStore store : globalStateStores) { + if (store.persistent()) { + return true; + } + } + return false; + } + private String childrenToString(final String indent, final List<ProcessorNode<?, ?>> children) { if (children == null || children.isEmpty()) { return ""; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index a3227ca0c7d..f5c4c31d083 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.io.FileFilter; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; @@ -50,6 +49,7 @@ private static final Logger log = LoggerFactory.getLogger(StateDirectory.class); private final File stateDir; + private final boolean createStateDirectory; private final HashMap<TaskId, FileChannel> channels = new HashMap<>(); private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>(); private final Time time; @@ -71,19 +71,21 @@ * Ensures that the state base directory as well as the application's sub-directory are created. * * @throws ProcessorStateException if the base state directory or application state directory does not exist - * and could not be created + * and could not be created when createStateDirectory is enabled. */ public StateDirectory(final StreamsConfig config, - final Time time) { + final Time time, + final boolean createStateDirectory) { this.time = time; + this.createStateDirectory = createStateDirectory; final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG); final File baseDir = new File(stateDirName); - if (!baseDir.exists() && !baseDir.mkdirs()) { + if (this.createStateDirectory && !baseDir.exists() && !baseDir.mkdirs()) { throw new ProcessorStateException( String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName)); } stateDir = new File(baseDir, config.getString(StreamsConfig.APPLICATION_ID_CONFIG)); - if (!stateDir.exists() && !stateDir.mkdir()) { + if (this.createStateDirectory && !stateDir.exists() && !stateDir.mkdir()) { throw new ProcessorStateException( String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath())); } @@ -96,7 +98,7 @@ public StateDirectory(final StreamsConfig config, */ public File directoryForTask(final TaskId taskId) { final File taskDir = new File(stateDir, taskId.toString()); - if (!taskDir.exists() && !taskDir.mkdir()) { + if (createStateDirectory && !taskDir.exists() && !taskDir.mkdir()) { throw new ProcessorStateException( String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath())); } @@ -110,7 +112,7 @@ public File directoryForTask(final TaskId taskId) { */ File globalStateDir() { final File dir = new File(stateDir, "global"); - if (!dir.exists() && !dir.mkdir()) { + if (createStateDirectory && !dir.exists() && !dir.mkdir()) { throw new ProcessorStateException( String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath())); } @@ -128,6 +130,9 @@ private String logPrefix() { * @throws IOException */ synchronized boolean lock(final TaskId taskId) throws IOException { + if (!createStateDirectory) { + return true; + } final File lockFile; // we already have the lock so bail out here @@ -169,6 +174,10 @@ synchronized boolean lock(final TaskId taskId) throws IOException { } synchronized boolean lockGlobalState() throws IOException { + if (!createStateDirectory) { + return true; + } + if (globalStateLock != null) { log.trace("{} Found cached state dir lock for the global task", logPrefix()); return true; @@ -234,7 +243,9 @@ public synchronized void clean() { throw new StreamsException(e); } try { - Utils.delete(globalStateDir().getAbsoluteFile()); + if (stateDir.exists()) { + Utils.delete(globalStateDir().getAbsoluteFile()); + } } catch (final IOException e) { log.error("{} Failed to delete global state directory due to an unexpected exception", logPrefix(), e); throw new StreamsException(e); @@ -320,12 +331,8 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs, * @return The list of all the existing local directories for stream tasks */ File[] listTaskDirectories() { - return stateDir.listFiles(new FileFilter() { - @Override - public boolean accept(final File pathname) { - return pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches(); - } - }); + return !stateDir.exists() ? new File[0] : + stateDir.listFiles(pathname -> pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches()); } private FileChannel getOrCreateFileChannel(final TaskId taskId, @@ -344,6 +351,4 @@ private FileLock tryLock(final FileChannel channel) throws IOException { } } - - } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index fde5bff9728..3ca0dcf9b29 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.streams; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -27,27 +30,37 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.StreamThread; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockMetricsReporter; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import java.io.File; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -57,6 +70,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; @@ -79,6 +93,9 @@ private KafkaStreams globalStreams; private Properties props; + @Rule + public TestName testName = new TestName(); + @Before public void before() { props = new Properties(); @@ -125,8 +142,8 @@ public void testStateChanges() throws InterruptedException { final StateListenerStub stateListener = new StateListenerStub(); globalStreams.setStateListener(stateListener); - Assert.assertEquals(globalStreams.state(), KafkaStreams.State.CREATED); - Assert.assertEquals(stateListener.numChanges, 0); + Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state()); + Assert.assertEquals(0, stateListener.numChanges); globalStreams.start(); TestUtils.waitForCondition( @@ -136,7 +153,7 @@ public void testStateChanges() throws InterruptedException { globalStreams.close(); - Assert.assertEquals(globalStreams.state(), KafkaStreams.State.NOT_RUNNING); + Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); } @Test @@ -158,7 +175,7 @@ public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception builder.globalTable("anyTopic"); final List<Node> nodes = asList(new Node(0, "localhost", 8121)); final Cluster cluster = new Cluster("mockClusterId", nodes, - Collections.emptySet(), Collections.<String>emptySet(), + Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), nodes.get(0)); final MockClientSupplier clientSupplier = new MockClientSupplier(); clientSupplier.setClusterForAdminClient(cluster); @@ -349,6 +366,7 @@ public void testCannotStartTwice() { try { globalStreams.start(); + fail("Should throw an IllegalStateException"); } catch (final IllegalStateException e) { // this is ok } finally { @@ -575,6 +593,141 @@ public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException { } } + @Test + public void statelessTopologyShouldNotCreateStateDirectory() throws Exception { + final String inputTopic = testName.getMethodName() + "-input"; + final String outputTopic = testName.getMethodName() + "-output"; + CLUSTER.createTopics(inputTopic, outputTopic); + + final Topology topology = new Topology(); + topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic) + .addProcessor("process", () -> new AbstractProcessor<String, String>() { + @Override + public void process(final String key, final String value) { + if (value.length() % 2 == 0) { + context().forward(key, key + value); + } + } + }, "source") + .addSink("sink", outputTopic, new StringSerializer(), new StringSerializer(), "process"); + startStreamsAndCheckDirExists(topology, Collections.singleton(inputTopic), outputTopic, false); + } + + @Test + public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws Exception { + final String inputTopic = testName.getMethodName() + "-input"; + final String outputTopic = testName.getMethodName() + "-output"; + final String globalTopicName = testName.getMethodName() + "-global"; + final String storeName = testName.getMethodName() + "-counts"; + final String globalStoreName = testName.getMethodName() + "-globalStore"; + final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, false); + startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, false); + } + + @Test + public void statefulTopologyShouldCreateStateDirectory() throws Exception { + final String inputTopic = testName.getMethodName() + "-input"; + final String outputTopic = testName.getMethodName() + "-output"; + final String globalTopicName = testName.getMethodName() + "-global"; + final String storeName = testName.getMethodName() + "-counts"; + final String globalStoreName = testName.getMethodName() + "-globalStore"; + final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true); + startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, true); + } + + @SuppressWarnings("unchecked") + private Topology getStatefulTopology(final String inputTopic, + final String outputTopic, + final String globalTopicName, + final String storeName, + final String globalStoreName, + final boolean isPersistentStore) throws Exception { + CLUSTER.createTopics(inputTopic, outputTopic, globalTopicName); + final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(isPersistentStore ? + Stores.persistentKeyValueStore(storeName) : Stores.inMemoryKeyValueStore(storeName), + Serdes.String(), Serdes.Long()); + final Topology topology = new Topology(); + topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic) + .addProcessor("process", () -> new AbstractProcessor<String, String>() { + @Override + public void process(final String key, final String value) { + final KeyValueStore<String, Long> kvStore = + (KeyValueStore<String, Long>) context().getStateStore(storeName); + kvStore.put(key, 5L); + + context().forward(key, "5"); + context().commit(); + } + }, "source") + .addStateStore(storeBuilder, "process") + .addSink("sink", outputTopic, new StringSerializer(), new StringSerializer(), "process"); + + final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = Stores.keyValueStoreBuilder( + isPersistentStore ? Stores.persistentKeyValueStore(globalStoreName) : Stores.inMemoryKeyValueStore(globalStoreName), + Serdes.String(), Serdes.String()).withLoggingDisabled(); + topology.addGlobalStore(globalStoreBuilder, + "global", + Serdes.String().deserializer(), + Serdes.String().deserializer(), + globalTopicName, + globalTopicName + "-processor", + new MockProcessorSupplier()); + return topology; + } + + private void startStreamsAndCheckDirExists(final Topology topology, + final Collection<String> inputTopics, + final String outputTopic, + final boolean shouldFilesExist) throws Exception { + final File baseDir = new File(TestUtils.IO_TMP_DIR + File.separator + "kafka-" + TestUtils.randomString(5)); + final Path basePath = baseDir.toPath(); + if (!baseDir.exists()) { + Files.createDirectory(basePath); + } + // changing the path of state directory to make sure that it should not clash with other test cases. + final Properties localProps = new Properties(); + localProps.putAll(props); + localProps.put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getAbsolutePath()); + + final KafkaStreams streams = new KafkaStreams(topology, localProps); + streams.start(); + + for (final String topic : inputTopics) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, + Collections.singletonList(new KeyValue<>("A", "A")), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + StringSerializer.class, + new Properties()), + System.currentTimeMillis()); + } + + IntegrationTestUtils.readKeyValues(outputTopic, + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + outputTopic + "-group", + StringDeserializer.class, + StringDeserializer.class), + 5000, 1); + + try { + final List<Path> files = Files.find(basePath, 999, (p, bfa) -> !p.equals(basePath)).collect(Collectors.toList()); + if (shouldFilesExist && files.isEmpty()) { + Assert.fail("Files should have existed, but it didn't: " + files); + } + if (!shouldFilesExist && !files.isEmpty()) { + Assert.fail("Files should not have existed, but it did: " + files); + } + } catch (final IOException e) { + Assert.fail("Couldn't read the state directory : " + baseDir.getPath()); + } finally { + streams.close(); + streams.cleanUp(); + Utils.delete(baseDir); + } + } + private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException { final File taskDir = new File(appDir, "0_0"); TestUtils.waitForCondition( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 81938f82d1a..c3c45db7de9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -132,7 +132,7 @@ public void shouldRestoreStateFromSourceTopic() throws Exception { createStateForRestoration(INPUT_STREAM); setCommittedOffset(INPUT_STREAM, offsetLimitDelta); - final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime()); + final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true); new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint")) .write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 0), (long) offsetCheckpointed)); new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)), ".checkpoint")) @@ -199,7 +199,7 @@ public void shouldRestoreStateFromChangelogTopic() throws Exception { createStateForRestoration(APPID + "-store-changelog"); createStateForRestoration(INPUT_STREAM); - final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime()); + final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true); new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint")) .write(Collections.singletonMap(new TopicPartition(APPID + "-store-changelog", 0), (long) offsetCheckpointed)); new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)), ".checkpoint")) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index 4ed44be47f2..b3afa24a325 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -152,7 +152,7 @@ public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws IOExcep expect(store4.name()).andReturn(storeName4).anyTimes(); EasyMock.replay(store4); - final StateDirectory stateDirectory = new StateDirectory(streamsConfig, new MockTime()); + final StateDirectory stateDirectory = new StateDirectory(streamsConfig, new MockTime(), true); final AbstractTask task = createTask( consumer, new HashMap<StateStore, String>() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 8789e2ba29a..8b02b3e66d2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -112,7 +112,7 @@ public void before() throws IOException { put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); } }); - stateDirectory = new StateDirectory(streamsConfig, time); + stateDirectory = new StateDirectory(streamsConfig, time, true); consumer = new MockConsumer<>(OffsetResetStrategy.NONE); stateManager = new GlobalStateManagerImpl( new LogContext("test"), @@ -139,7 +139,7 @@ public void shouldLockGlobalStateDirectory() { @Test(expected = LockException.class) public void shouldThrowLockExceptionIfCantGetLock() throws IOException { - final StateDirectory stateDir = new StateDirectory(streamsConfig, time); + final StateDirectory stateDir = new StateDirectory(streamsConfig, time, true); try { stateDir.lockGlobalState(); stateManager.initialize(); @@ -357,8 +357,8 @@ public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() { @Test public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException { stateManager.initialize(); - stateManager.close(Collections.<TopicPartition, Long>emptyMap()); - final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime()); + stateManager.close(Collections.emptyMap()); + final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime(), true); try { // should be able to get the lock now as it should've been released in close assertTrue(stateDir.lockGlobalState()); @@ -419,7 +419,7 @@ public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws IOExcept } catch (final StreamsException e) { // expected } - final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime()); + final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime(), true); try { // should be able to get the lock now as it should've been released assertTrue(stateDir.lockGlobalState()); @@ -511,7 +511,7 @@ public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir( new LogContext("mock"), topology, consumer, - new StateDirectory(streamsConfig, time) { + new StateDirectory(streamsConfig, time, true) { @Override public boolean lockGlobalState() throws IOException { throw new IOException("KABOOM!"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index 37a6fdba453..387179b56b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -107,7 +107,7 @@ public String newStoreName(final String prefix) { globalStreamThread = new GlobalStreamThread(builder.rewriteTopology(config).buildGlobalStateTopology(), config, mockConsumer, - new StateDirectory(config, time), + new StateDirectory(config, time, true), 0, new Metrics(), new MockTime(), @@ -140,7 +140,7 @@ public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() { globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(), config, mockConsumer, - new StateDirectory(config, time), + new StateDirectory(config, time, true), 0, new Metrics(), new MockTime(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index fbcb2c85b47..3d91ee08136 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -98,7 +98,7 @@ public void setup() { put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getPath()); } - }), new MockTime()); + }), new MockTime(), true); checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); checkpoint = new OffsetCheckpoint(checkpointFile); } @@ -565,7 +565,7 @@ public void close() { } @Test - public void shouldFlushAllStoresEvenIfStoreThrowsExcepiton() throws IOException { + public void shouldFlushAllStoresEvenIfStoreThrowsException() throws IOException { final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, Collections.singleton(changelogTopicPartition), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 587cae20179..11050fe6f55 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -38,7 +38,9 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessorSupplier; @@ -54,8 +56,10 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class ProcessorTopologyTest { @@ -367,6 +371,58 @@ public void shouldAddHeaders() { assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", HEADERS, 30L); } + @Test + public void statelessTopologyShouldNotHavePersistentStore() { + final TopologyWrapper topology = new TopologyWrapper(); + final ProcessorTopology processorTopology = topology.getInternalBuilder("anyAppId").build(); + assertFalse(processorTopology.hasPersistentLocalStore()); + assertFalse(processorTopology.hasPersistentGlobalStore()); + } + + @Test + public void inMemoryStoreShouldNotResultInPersistentLocalStore() { + final ProcessorTopology processorTopology = createLocalStoreTopology(Stores.inMemoryKeyValueStore("my-store")); + assertFalse(processorTopology.hasPersistentLocalStore()); + } + + @Test + public void persistentLocalStoreShouldBeDetected() { + final ProcessorTopology processorTopology = createLocalStoreTopology(Stores.persistentKeyValueStore("my-store")); + assertTrue(processorTopology.hasPersistentLocalStore()); + } + + @Test + public void inMemoryStoreShouldNotResultInPersistentGlobalStore() { + final ProcessorTopology processorTopology = createGlobalStoreTopology(Stores.inMemoryKeyValueStore("my-store")); + assertFalse(processorTopology.hasPersistentGlobalStore()); + } + + @Test + public void persistentGlobalStoreShouldBeDetected() { + final ProcessorTopology processorTopology = createGlobalStoreTopology(Stores.persistentKeyValueStore("my-store")); + assertTrue(processorTopology.hasPersistentGlobalStore()); + } + + private ProcessorTopology createLocalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) { + final TopologyWrapper topology = new TopologyWrapper(); + final String processor = "processor"; + final StoreBuilder<KeyValueStore<String, String>> storeBuilder = + Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String()); + topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic") + .addProcessor(processor, () -> new StatefulProcessor(storeSupplier.name()), "source") + .addStateStore(storeBuilder, processor); + return topology.getInternalBuilder("anyAppId").build(); + } + + private ProcessorTopology createGlobalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) { + final TopologyWrapper topology = new TopologyWrapper(); + final StoreBuilder<KeyValueStore<String, String>> storeBuilder = + Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String()).withLoggingDisabled(); + topology.addGlobalStore(storeBuilder, "global", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic", "processor", + define(new StatefulProcessor(storeSupplier.name()))); + return topology.getInternalBuilder("anyAppId").build(); + } + private void assertNextOutputRecord(final String topic, final String key, final String value) { @@ -416,12 +472,7 @@ private void assertNoOutputRecord(final String topic) { } private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) { - return new StreamPartitioner<Object, Object>() { - @Override - public Integer partition(final String topic, final Object key, final Object value, final int numPartitions) { - return partition; - } - }; + return (topic, key, value, numPartitions) -> partition; } private Topology createSimpleTopology(final int partition) { @@ -619,12 +670,7 @@ public void close() { } private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) { - return new ProcessorSupplier<K, V>() { - @Override - public Processor<K, V> get() { - return processor; - } - }; + return () -> processor; } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 0c24e2d637f..17b146f6fd0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -160,7 +160,7 @@ public void setup() throws Exception { new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0]) )); baseDir = TestUtils.tempDirectory(); - stateDirectory = new StateDirectory(createConfig(baseDir), new MockTime()); + stateDirectory = new StateDirectory(createConfig(baseDir), new MockTime(), true); } @After diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index e14d01077ac..edb02c6224f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -52,9 +52,8 @@ private StateDirectory directory; private File appDir; - @Before - public void before() { - stateDir = new File(TestUtils.IO_TMP_DIR, TestUtils.randomString(5)); + private void initializeStateDirectory(final boolean createStateDirectory) { + stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + TestUtils.randomString(5)); directory = new StateDirectory( new StreamsConfig(new Properties() { { @@ -63,10 +62,15 @@ public void before() { put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath()); } }), - time); + time, createStateDirectory); appDir = new File(stateDir, applicationId); } + @Before + public void before() { + initializeStateDirectory(true); + } + @After public void cleanup() throws IOException { Utils.delete(stateDir); @@ -138,7 +142,7 @@ public void shouldNotLockDeletedDirectory() throws IOException { } @Test - public void shouldLockMulitpleTaskDirectories() throws IOException { + public void shouldLockMultipleTaskDirectories() throws IOException { final TaskId taskId = new TaskId(0, 0); final File task1Dir = directory.directoryForTask(taskId); final TaskId taskId2 = new TaskId(1, 0); @@ -254,7 +258,7 @@ public void shouldCreateDirectoriesIfParentDoesntExist() { put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath()); } }), - time); + time, true); final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0)); assertTrue(stateDir.exists()); assertTrue(taskDir.exists()); @@ -296,14 +300,11 @@ public void shouldUnlockGlobalStateDirectory() throws IOException { public void shouldNotLockStateDirLockedByAnotherThread() throws IOException, InterruptedException { final TaskId taskId = new TaskId(0, 0); final AtomicReference<IOException> exceptionOnThread = new AtomicReference<>(); - final Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try { - directory.lock(taskId); - } catch (final IOException e) { - exceptionOnThread.set(e); - } + final Thread thread = new Thread(() -> { + try { + directory.lock(taskId); + } catch (final IOException e) { + exceptionOnThread.set(e); } }); thread.start(); @@ -318,17 +319,14 @@ public void shouldNotUnLockStateDirLockedByAnotherThread() throws IOException, I final CountDownLatch lockLatch = new CountDownLatch(1); final CountDownLatch unlockLatch = new CountDownLatch(1); final AtomicReference<Exception> exceptionOnThread = new AtomicReference<>(); - final Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try { - directory.lock(taskId); - lockLatch.countDown(); - unlockLatch.await(); - directory.unlock(taskId); - } catch (final Exception e) { - exceptionOnThread.set(e); - } + final Thread thread = new Thread(() -> { + try { + directory.lock(taskId); + lockLatch.countDown(); + unlockLatch.await(); + directory.unlock(taskId); + } catch (final Exception e) { + exceptionOnThread.set(e); } }); thread.start(); @@ -358,4 +356,39 @@ public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() { files = Arrays.asList(appDir.listFiles()); assertEquals(0, files.size()); } + + @Test + public void shouldNotCreateBaseDirectory() { + initializeStateDirectory(false); + assertFalse(stateDir.exists()); + assertFalse(appDir.exists()); + } + + @Test + public void shouldNotCreateTaskStateDirectory() { + initializeStateDirectory(false); + final TaskId taskId = new TaskId(0, 0); + final File taskDirectory = directory.directoryForTask(taskId); + assertFalse(taskDirectory.exists()); + } + + @Test + public void shouldNotCreateGlobalStateDirectory() { + initializeStateDirectory(false); + final File globalStateDir = directory.globalStateDir(); + assertFalse(globalStateDir.exists()); + } + + @Test + public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() throws IOException { + initializeStateDirectory(false); + final TaskId taskId = new TaskId(0, 0); + assertTrue(directory.lock(taskId)); + } + + @Test + public void shouldLockGlobalStateDirectoryWhenDirectoryCreationDisabled() throws IOException { + initializeStateDirectory(false); + assertTrue(directory.lockGlobalState()); + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index b042e3c6a54..d4b8511ee91 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -167,7 +167,7 @@ private StreamsConfig createConfig(final boolean enableEoS) { @Before public void setup() { consumer.assign(asList(partition1, partition2)); - stateDirectory = new StateDirectory(createConfig(false), new MockTime()); + stateDirectory = new StateDirectory(createConfig(false), new MockTime(), true); } @After @@ -968,7 +968,7 @@ public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() { } @Test - public void shouldWrapProducerFencedExceptionWithTaskMigragedExceptionForBeginTransaction() { + public void shouldWrapProducerFencedExceptionWithTaskMigratedExceptionForBeginTransaction() { task = createStatelessTask(createConfig(true)); producer.fenceProducer(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 11474a38663..42e22f53c99 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -112,7 +112,7 @@ private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder()); private final StreamsConfig config = new StreamsConfig(configProps(false)); private final String stateDir = TestUtils.tempDirectory().getPath(); - private final StateDirectory stateDirectory = new StateDirectory(config, mockTime); + private final StateDirectory stateDirectory = new StateDirectory(config, mockTime, true); private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>(); private UUID processId = UUID.randomUUID(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index ca059b4c60b..9679429b66a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -99,7 +99,7 @@ public void before() { final ProcessorTopology processorTopology = topology.getInternalBuilder(applicationId).build(); tasks = new HashMap<>(); - stateDirectory = new StateDirectory(streamsConfig, new MockTime()); + stateDirectory = new StateDirectory(streamsConfig, new MockTime(), true); taskOne = createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 0)); taskOne.initializeStateStores(); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index a11ae6b8df0..a90afe7918a 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -246,6 +246,8 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, processorTopology = internalTopologyBuilder.build(null); globalTopology = internalTopologyBuilder.buildGlobalStateTopology(); + final boolean createStateDirectory = processorTopology.hasPersistentLocalStore() || + (globalTopology != null && globalTopology.hasPersistentGlobalStore()); final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) { @@ -256,7 +258,7 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, }; final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime); + stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime, createStateDirectory); final MetricConfig metricConfig = new MetricConfig() .samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 3e95c731d9e..9f1c3a7a369 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams; +import java.io.File; import java.time.Duration; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; @@ -41,6 +42,8 @@ import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; @@ -72,6 +75,7 @@ import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -773,14 +777,18 @@ public void shouldReturnAllStoresNames() { } private void setup() { + setup(Stores.inMemoryKeyValueStore("aggStore")); + } + + private void setup(final KeyValueBytesStoreSupplier storeSupplier) { final Topology topology = new Topology(); topology.addSource("sourceProcessor", "input-topic"); topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor"); topology.addStateStore(Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("aggStore"), - Serdes.String(), - Serdes.Long()), - "aggregator"); + storeSupplier, + Serdes.String(), + Serdes.Long()), + "aggregator"); topology.addSink("sinkProcessor", "result-topic", "aggregator"); config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); @@ -1070,4 +1078,25 @@ public void shouldThrowPatternNotValidForTopicNameException() { assertEquals(str, exception.getMessage()); } } + + @Test + public void shouldNotCreateStateDirectoryForStatelessTopology() { + setup(); + final String stateDir = config.getProperty(StreamsConfig.STATE_DIR_CONFIG); + final File appDir = new File(stateDir, config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)); + assertFalse(appDir.exists()); + } + + @Test + public void shouldCreateStateDirectoryForStatefulTopology() { + setup(Stores.persistentKeyValueStore("aggStore")); + final String stateDir = config.getProperty(StreamsConfig.STATE_DIR_CONFIG); + final File appDir = new File(stateDir, config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)); + + assertTrue(appDir.exists()); + assertTrue(appDir.isDirectory()); + + final TaskId taskId = new TaskId(0, 0); + assertTrue(new File(appDir, taskId.toString()).exists()); + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streams should not create state store directories unless they are needed > ------------------------------------------------------------------------ > > Key: KAFKA-7367 > URL: https://issues.apache.org/jira/browse/KAFKA-7367 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Assignee: Kamal Chandraprakash > Priority: Major > Labels: newbie > > Streams currently unconditionally creates state store directories, even if > the topology is stateless. > This can be a problem running Streams in an environment without access to > disk. -- This message was sent by Atlassian JIRA (v7.6.3#76005)