This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 3d399a5 KAFKA-7334: Suggest changing config for state.dir in case of
FileNotFoundException (#9380)
3d399a5 is described below
commit 3d399a51da3415ea3a58760402ba665950390732
Author: voffcheg109 <[email protected]>
AuthorDate: Thu Oct 8 23:20:21 2020 +0400
KAFKA-7334: Suggest changing config for state.dir in case of
FileNotFoundException (#9380)
Add additional warning logs and improve existing log messages for
`FileNotFoundException` and if /tmp is used as state directory.
Reviewers: A. Sophie Blee-Goldman <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../kafka/streams/processor/internals/GlobalStateManagerImpl.java | 4 +++-
.../kafka/streams/processor/internals/ProcessorStateManager.java | 6 +++++-
.../apache/kafka/streams/processor/internals/StateDirectory.java | 4 ++++
.../streams/processor/internals/ProcessorStateManagerTest.java | 5 ++++-
4 files changed, 16 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index bd3aa3a..2c57a0a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -446,7 +446,9 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
try {
checkpointFile.write(filteredOffsets);
} catch (final IOException e) {
- log.warn("Failed to write offset checkpoint file to {} for global
stores: {}", checkpointFile, e);
+ log.warn("Failed to write offset checkpoint file to {} for global
stores: {}." +
+ " This may occur if OS cleaned the state.dir in case when it
is located in the (default) /tmp/kafka-streams directory." +
+ " Changing the location of state.dir may resolve the problem",
checkpointFile, e);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 58a21fc..3dcdaaf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -603,7 +603,11 @@ public class ProcessorStateManager implements StateManager
{
try {
checkpointFile.write(checkpointingOffsets);
} catch (final IOException e) {
- log.warn("Failed to write offset checkpoint file to [{}]",
checkpointFile, e);
+ log.warn("Failed to write offset checkpoint file to [{}]." +
+ " This may occur if OS cleaned the state.dir in case when it
located in /tmp directory." +
+ " This may also occur due to running multiple instances on the
same machine using the same state dir." +
+ " Changing the location of state.dir may resolve the problem.",
+ checkpointFile, e);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 35f937a..861a971 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -98,6 +98,10 @@ public class StateDirectory {
throw new ProcessorStateException(
String.format("state directory [%s] doesn't exist and couldn't
be created", stateDir.getPath()));
}
+ if (hasPersistentStores && stateDirName.startsWith("/tmp")) {
+ log.warn("Using /tmp directory in the state.dir property can cause
failures with writing the checkpoint file" +
+ " due to the fact that this directory can be cleared by the
OS");
+ }
}
/**
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index a652b17..4f077a0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -776,7 +776,10 @@ public class ProcessorStateManagerTest {
for (final LogCaptureAppender.Event event : appender.getEvents()) {
if ("WARN".equals(event.getLevel())
&&
event.getMessage().startsWith("process-state-manager-test Failed to write
offset checkpoint file to [")
- && event.getMessage().endsWith(".checkpoint]")
+ && event.getMessage().endsWith(".checkpoint]." +
+ " This may occur if OS cleaned the state.dir in case
when it located in /tmp directory." +
+ " This may also occur due to running multiple
instances on the same machine using the same state dir." +
+ " Changing the location of state.dir may resolve the
problem.")
&&
event.getThrowableInfo().get().startsWith("java.io.FileNotFoundException: ")) {
foundExpectedLogMessage = true;