This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 3d399a5  KAFKA-7334: Suggest changing config for state.dir in case of 
FileNotFoundException (#9380)
3d399a5 is described below

commit 3d399a51da3415ea3a58760402ba665950390732
Author: voffcheg109 <[email protected]>
AuthorDate: Thu Oct 8 23:20:21 2020 +0400

    KAFKA-7334: Suggest changing config for state.dir in case of 
FileNotFoundException (#9380)
    
    Add additional warning logs and improve existing log messages for 
`FileNotFoundException` and if /tmp is used as state directory.
    
    Reviewers: A. Sophie Blee-Goldman <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../kafka/streams/processor/internals/GlobalStateManagerImpl.java   | 4 +++-
 .../kafka/streams/processor/internals/ProcessorStateManager.java    | 6 +++++-
 .../apache/kafka/streams/processor/internals/StateDirectory.java    | 4 ++++
 .../streams/processor/internals/ProcessorStateManagerTest.java      | 5 ++++-
 4 files changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index bd3aa3a..2c57a0a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -446,7 +446,9 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
         try {
             checkpointFile.write(filteredOffsets);
         } catch (final IOException e) {
-            log.warn("Failed to write offset checkpoint file to {} for global 
stores: {}", checkpointFile, e);
+            log.warn("Failed to write offset checkpoint file to {} for global 
stores: {}." +
+                " This may occur if OS cleaned the state.dir in case when it 
is located in the (default) /tmp/kafka-streams directory." +
+                " Changing the location of state.dir may resolve the problem", 
checkpointFile, e);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 58a21fc..3dcdaaf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -603,7 +603,11 @@ public class ProcessorStateManager implements StateManager 
{
         try {
             checkpointFile.write(checkpointingOffsets);
         } catch (final IOException e) {
-            log.warn("Failed to write offset checkpoint file to [{}]", 
checkpointFile, e);
+            log.warn("Failed to write offset checkpoint file to [{}]." +
+                " This may occur if OS cleaned the state.dir in case when it 
located in /tmp directory." +
+                " This may also occur due to running multiple instances on the 
same machine using the same state dir." +
+                " Changing the location of state.dir may resolve the problem.",
+                checkpointFile, e);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 35f937a..861a971 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -98,6 +98,10 @@ public class StateDirectory {
             throw new ProcessorStateException(
                 String.format("state directory [%s] doesn't exist and couldn't 
be created", stateDir.getPath()));
         }
+        if (hasPersistentStores && stateDirName.startsWith("/tmp")) {
+            log.warn("Using /tmp directory in the state.dir property can cause 
failures with writing the checkpoint file" +
+                " due to the fact that this directory can be cleared by the 
OS");
+        }
     }
 
     /**
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index a652b17..4f077a0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -776,7 +776,10 @@ public class ProcessorStateManagerTest {
             for (final LogCaptureAppender.Event event : appender.getEvents()) {
                 if ("WARN".equals(event.getLevel())
                     && 
event.getMessage().startsWith("process-state-manager-test Failed to write 
offset checkpoint file to [")
-                    && event.getMessage().endsWith(".checkpoint]")
+                    && event.getMessage().endsWith(".checkpoint]." +
+                        " This may occur if OS cleaned the state.dir in case 
when it located in /tmp directory." +
+                        " This may also occur due to running multiple 
instances on the same machine using the same state dir." +
+                        " Changing the location of state.dir may resolve the 
problem.")
                     && 
event.getThrowableInfo().get().startsWith("java.io.FileNotFoundException: ")) {
 
                     foundExpectedLogMessage = true;

Reply via email to