This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22d520adae1 KAFKA-19943: Implement state store clean up on start 
(#21566)
22d520adae1 is described below

commit 22d520adae1211d6b0939acb06e3bf9bf50696be
Author: Uladzislau Blok <[email protected]>
AuthorDate: Mon Mar 9 19:12:47 2026 +0100

    KAFKA-19943: Implement state store clean up on start (#21566)
    
    Implementation for KIP-1259
    
    Adds a new property to set max directory age, and remove directories if
    `current time - max age > last modify time`.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 docs/streams/developer-guide/config-streams.md     | 20 +++++++++--
 docs/streams/upgrade-guide.md                      |  2 ++
 .../org/apache/kafka/streams/KafkaStreams.java     |  5 +++
 .../org/apache/kafka/streams/StreamsConfig.java    | 11 ++++++
 .../processor/internals/StateDirectory.java        | 40 ++++++++++++++++++++++
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 37 ++++++++++++++++++++
 .../processor/internals/StateDirectoryTest.java    | 36 +++++++++++++++++++
 7 files changed, 149 insertions(+), 2 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.md 
b/docs/streams/developer-guide/config-streams.md
index 57662eb58b9..265e58f9c20 100644
--- a/docs/streams/developer-guide/config-streams.md
+++ b/docs/streams/developer-guide/config-streams.md
@@ -1039,8 +1039,24 @@ The amount of time in milliseconds to wait before 
deleting state when a partitio
 </td>  
 <td>
 
-`600000`
-</td> (10 minutes)
+`600000` (10 minutes)
+</td> </tr>
+<tr>
+<td>
+
+state.cleanup.dir.max.age.ms
+</td>
+<td>
+
+Low
+</td>
+<td>
+
+Time-based threshold for purging local state directories and checkpoint files 
during application startup. State directories that have not been modified for 
at least `state.cleanup.dir.max.age.ms` will be removed.
+</td>
+<td>
+
+`-1` (Disabled)
 </td> </tr>  
 <tr>  
 <td>
diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md
index 072f72fd34a..afdf1e5c497 100644
--- a/docs/streams/upgrade-guide.md
+++ b/docs/streams/upgrade-guide.md
@@ -69,6 +69,8 @@ Kafka Streams now supports `ProcessingExceptionHandler` for 
global store/KTable
 
 The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, 
and `poll-ratio`, along with streams state updater metrics 
`active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and 
`checkpoint-ratio` have been updated. Each metric now reports, over a rolling 
measurement window, the ratio of time this thread spends performing the given 
action (`{action}`) to the total elapsed time in that window. The effective 
window duration is determined by the metrics configurat [...]
 
+Kafka Streams now allows to purge local state directories and checkpoint files 
during application startup if they have not been modified for a certain period 
of time. This can be configured via the new `state.cleanup.dir.max.age.ms` 
config. More details can be found in 
[KIP-1259](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup)
+
 ### Deprecation of streams-scala module (KIP-1244)
 
 The `kafka-streams-scala` module (`org.apache.kafka.streams.scala` package) is 
deprecated in 4.3.0 and will be removed in 5.0.
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b00623b32bb..7adb5ac73c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1388,6 +1388,11 @@ public class KafkaStreams implements AutoCloseable {
      */
     public synchronized void start() throws IllegalStateException, 
StreamsException {
         if (setState(State.REBALANCING)) {
+            final Long dirMaxAgeMs = 
applicationConfigs.getLong(StreamsConfig.STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG);
+            if (dirMaxAgeMs != 
StreamsConfig.STATE_CLEANUP_DIR_MAX_AGE_MS_DISABLED) {
+                log.debug("Start cleaning outdated directories");
+                stateDirectory.cleanOutdatedDirsOnStartup(dirMaxAgeMs);
+            }
             log.debug("Initializing store offsets for existing local state");
             stateDirectory.initializeStartupStores(topologyMetadata, 
logContext, streamsMetrics);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index f2f402034c9..7ad19d3562c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -173,6 +173,7 @@ public class StreamsConfig extends AbstractConfig {
     public static final int DUMMY_THREAD_INDEX = 1;
 
     public static final long MAX_TASK_IDLE_MS_DISABLED = -1;
+    public static final long STATE_CLEANUP_DIR_MAX_AGE_MS_DISABLED = -1;
 
     // We impose these limitations because client tags are encoded into the 
subscription info,
     // which is part of the group metadata message that is persisted into the 
internal topic.
@@ -767,6 +768,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String STATE_CLEANUP_DELAY_MS_CONFIG = 
"state.cleanup.delay.ms";
     private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of 
time in milliseconds to wait before deleting state when a partition has 
migrated. Only state directories that have not been modified for at least 
<code>state.cleanup.delay.ms</code> will be removed";
 
+    /** {@code state.cleanup.dir.max.age} */
+    @SuppressWarnings("WeakerAccess")
+    public static final String STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG = 
"state.cleanup.dir.max.age.ms";
+    private static final String STATE_CLEANUP_DIR_MAX_AGE_MS_DOC = "Time-based 
threshold for purging local state directories and checkpoint files during 
application startup. State directories that have not been modified for at least 
<code>" + STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG + "</code> will be removed.";
+
     /** {@code state.dir} */
     @SuppressWarnings("WeakerAccess")
     public static final String STATE_DIR_CONFIG = "state.dir";
@@ -1266,6 +1272,11 @@ public class StreamsConfig extends AbstractConfig {
                     10 * 60 * 1000L,
                     Importance.LOW,
                     STATE_CLEANUP_DELAY_MS_DOC)
+            .define(STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG,
+                    Type.LONG,
+                    STATE_CLEANUP_DIR_MAX_AGE_MS_DISABLED,
+                    Importance.LOW,
+                    STATE_CLEANUP_DIR_MAX_AGE_MS_DOC)
             .define(UPGRADE_FROM_CONFIG,
                     Type.STRING,
                     null,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 701ac8195d7..5b37c1bf427 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -646,6 +646,46 @@ public class StateDirectory implements AutoCloseable {
         maybeCleanEmptyNamedTopologyDirs(true);
     }
 
+    /**
+     * Purges local state directories and checkpoint files during application 
startup.
+     *
+     * @param dirMaxAgeMs the time-based threshold in milliseconds. Only state 
directories
+     * and checkpoint files that have not been modified for at least
+     * this amount of time (corresponding to the
+     * {@code state.cleanup.dir.max.age.ms} property) will be removed.
+     */
+    public synchronized void cleanOutdatedDirsOnStartup(final long 
dirMaxAgeMs) {
+        try {
+            cleanStateAndTaskDirectoriesOnStartup(dirMaxAgeMs);
+        } catch (final Exception e) {
+            throw new StreamsException(e);
+        }
+    }
+
+    private void cleanStateAndTaskDirectoriesOnStartup(final long dirMaxAgeMs) 
throws Exception {
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+        for (final TaskDirectory taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.file().getName();
+            try {
+                final long now = time.milliseconds();
+                final long lastModifiedMs = taskDir.file().lastModified();
+                if (now - dirMaxAgeMs > lastModifiedMs) {
+                    log.info("Deleting outdated state directory {} as {}ms has 
elapsed from last update (max directory age is {}ms).",
+                            dirName, now - lastModifiedMs, dirMaxAgeMs);
+                    Utils.delete(taskDir.file());
+                }
+            } catch (final IOException exception) {
+                log.error("Failed to delete task directory {} with 
exception:", dirName, exception);
+                firstException.compareAndSet(null, exception);
+            }
+        }
+
+        final Exception exception = firstException.get();
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
     /**
      * Cleans up any leftover named topology directories that are empty, if 
any exist
      * @param logExceptionAsWarn if true, an exception will be logged as a 
warning
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 28ca83ddb2b..857b53910f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -126,6 +126,7 @@ import static org.mockito.Mockito.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -1877,6 +1878,42 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    public void shouldCallCleanOnStartupOnlyWhenEnabled() {
+        props.put(StreamsConfig.STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG, 100);
+
+        prepareStreams();
+        prepareStreamThread(streamThreadOne, 1);
+        prepareStreamThread(streamThreadTwo, 2);
+
+        try (final MockedConstruction<StateDirectory> constructed = 
mockConstruction(StateDirectory.class,
+                (mock, context) -> 
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
+            try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+                assertEquals(1, constructed.constructed().size());
+                final StateDirectory stateDirectory = 
constructed.constructed().get(0);
+                streams.start();
+                verify(stateDirectory).cleanOutdatedDirsOnStartup(100);
+            }
+        }
+    }
+
+    @Test
+    public void shouldNotCallCleanOnStartupByDefault() {
+        prepareStreams();
+        prepareStreamThread(streamThreadOne, 1);
+        prepareStreamThread(streamThreadTwo, 2);
+
+        try (final MockedConstruction<StateDirectory> constructed = 
mockConstruction(StateDirectory.class,
+                (mock, context) -> 
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
+            try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+                assertEquals(1, constructed.constructed().size());
+                final StateDirectory stateDirectory = 
constructed.constructed().get(0);
+                streams.start();
+                verify(stateDirectory, 
never()).cleanOutdatedDirsOnStartup(anyLong());
+            }
+        }
+    }
+
     private Topology getStatefulTopology(final String inputTopic,
                                          final String outputTopic,
                                          final String globalTopicName,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index b89628d225b..6e2250ead51 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -910,6 +910,42 @@ public class StateDirectoryTest {
         assertFalse(store.isOpen());
     }
 
+    @Test
+    public void 
shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusMaxDirAge() {
+        final TaskId task0 = new TaskId(0, 0);
+        final TaskId task1 = new TaskId(1, 0);
+        final TaskId task2 = new TaskId(2, 0);
+
+        final int dirMaxAgeMs = 60000;
+        final long outdatedModifiedTime = time.milliseconds() - dirMaxAgeMs - 
1000;
+
+        assertTrue(new File(directory.getOrCreateDirectoryForTask(task0), 
"store").mkdir());
+        assertTrue(new File(directory.getOrCreateDirectoryForTask(task1), 
"store").mkdir());
+        assertTrue(new File(directory.getOrCreateDirectoryForTask(task2), 
"store").mkdir());
+
+        final File dir0File = new File(appDir, toTaskDirString(task0));
+        dir0File.setLastModified(outdatedModifiedTime);
+        final File dir1File = new File(appDir, toTaskDirString(task1));
+        dir1File.setLastModified(outdatedModifiedTime);
+        final File dir2File = new File(appDir, toTaskDirString(task2));
+
+        final TaskDirectory dir0 = new TaskDirectory(dir0File, null);
+        final TaskDirectory dir1 = new TaskDirectory(dir1File, null);
+        final TaskDirectory dir2 = new TaskDirectory(dir2File, null);
+
+        List<TaskDirectory> files = directory.listAllTaskDirectories();
+        assertEquals(Set.of(dir0, dir1, dir2), new HashSet<>(files));
+        files = directory.listNonEmptyTaskDirectories();
+        assertEquals(Set.of(dir0, dir1, dir2), new HashSet<>(files));
+
+        directory.cleanOutdatedDirsOnStartup(dirMaxAgeMs);
+
+        files = directory.listAllTaskDirectories();
+        assertEquals(Set.of(dir2), new HashSet<>(files));
+        files = directory.listNonEmptyTaskDirectories();
+        assertEquals(Set.of(dir2), new HashSet<>(files));
+    }
+
     private StateStore initializeStartupStores(final TaskId taskId, final 
boolean createTaskDir) {
         directory.initializeProcessId();
         final TopologyMetadata metadata = Mockito.mock(TopologyMetadata.class);

Reply via email to