UladzislauBlok commented on code in PR #21566:
URL: https://github.com/apache/kafka/pull/21566#discussion_r2848368911


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1376,6 +1376,11 @@ private static HostInfo parseHostInfo(final String 
endPoint) {
      */
     public synchronized void start() throws IllegalStateException, 
StreamsException {
         if (setState(State.REBALANCING)) {
+            final Long dirMaxAgeMs = 
applicationConfigs.getLong(StreamsConfig.STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG);
+            if (dirMaxAgeMs != 
StreamsConfig.STATE_CLEANUP_DIR_MAX_AGE_MS_DISABLED) {

Review Comment:
   I didn't add test for this one, because it's super simple. I can add one if 
anyone think that will be usefull



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -603,6 +603,55 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
         maybeCleanEmptyNamedTopologyDirs(true);
     }
 
+    /**
+     * Purges local state directories and checkpoint files during application 
startup.
+     *
+     * @param dirMaxAgeMs the time-based threshold in milliseconds. Only state 
directories
+     * and checkpoint files that have not been modified for at least
+     * this amount of time (corresponding to the
+     * {@code state.cleanup.dir.max.age.ms} property) will be removed.
+     */
+    public synchronized void cleanOutdatedDirsOnStartup(final long 
dirMaxAgeMs) {
+        try {
+            cleanStateAndTaskDirectoriesOnStartup(dirMaxAgeMs);
+        } catch (final Exception e) {
+            throw new StreamsException(e);
+        }
+    }
+
+    private void cleanStateAndTaskDirectoriesOnStartup(final long dirMaxAgeMs) 
throws Exception {
+        if (!lockedTasksToOwner.isEmpty()) {
+            log.warn("Found some still-locked task directories when cleaning 
outdated directories");

Review Comment:
   That shouldn't be the case, so I wasn't sure log warn / error or throw an 
exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to