This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 650d93b KAFKA-12211: don't change perm for base/state dir when no
persistent store (#9904)
650d93b is described below
commit 650d93b2d8a90afda0c367b430ec1c8c49d09743
Author: Luke Chen <[email protected]>
AuthorDate: Thu Jan 21 03:37:56 2021 +0800
KAFKA-12211: don't change perm for base/state dir when no persistent store
(#9904)
If a user doesn't have Persistent Stores, we won't create base dir and
state dir and should not try to set permissions on them.
Reviewers: Bruno Cadonna <[email protected]>, Guozhang Wang
<[email protected]>, Anna Sophie Blee-Goldman <[email protected]>
---
.../processor/internals/StateDirectory.java | 45 ++++++++++++----------
.../processor/internals/StateDirectoryTest.java | 15 +++++---
2 files changed, 35 insertions(+), 25 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 2f04888..b98c3cb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -94,27 +94,32 @@ public class StateDirectory {
this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final String stateDirName =
config.getString(StreamsConfig.STATE_DIR_CONFIG);
final File baseDir = new File(stateDirName);
- if (this.hasPersistentStores && !baseDir.exists() &&
!baseDir.mkdirs()) {
- throw new ProcessorStateException(
- String.format("base state directory [%s] doesn't exist and
couldn't be created", stateDirName));
- }
stateDir = new File(baseDir, appId);
- if (this.hasPersistentStores && !stateDir.exists() &&
!stateDir.mkdir()) {
- throw new ProcessorStateException(
- String.format("state directory [%s] doesn't exist and couldn't
be created", stateDir.getPath()));
- }
- if (hasPersistentStores && stateDirName.startsWith("/tmp")) {
- log.warn("Using /tmp directory in the state.dir property can cause
failures with writing the checkpoint file" +
- " due to the fact that this directory can be cleared by the
OS");
- }
- final Path basePath = Paths.get(baseDir.getPath());
- final Path statePath = Paths.get(stateDir.getPath());
- final Set<PosixFilePermission> perms =
PosixFilePermissions.fromString("rwxr-x---");
- try {
- Files.setPosixFilePermissions(basePath, perms);
- Files.setPosixFilePermissions(statePath, perms);
- } catch (final IOException e) {
- log.error("Error changing permissions for the state or base
directory {} ", stateDir.getPath(), e);
+
+ if (this.hasPersistentStores) {
+ if (!baseDir.exists() && !baseDir.mkdirs()) {
+ throw new ProcessorStateException(
+ String.format("base state directory [%s] doesn't exist and
couldn't be created", stateDirName));
+ }
+ if (!stateDir.exists() && !stateDir.mkdir()) {
+ throw new ProcessorStateException(
+ String.format("state directory [%s] doesn't exist and
couldn't be created", stateDir.getPath()));
+ }
+ if (stateDirName.startsWith("/tmp")) {
+ log.warn("Using /tmp directory in the state.dir property can
cause failures with writing the checkpoint file" +
+ " due to the fact that this directory can be cleared by
the OS");
+ }
+
+ // change the dir permission to "rwxr-x---" to avoid world readable
+ final Path basePath = Paths.get(baseDir.getPath());
+ final Path statePath = Paths.get(stateDir.getPath());
+ final Set<PosixFilePermission> perms =
PosixFilePermissions.fromString("rwxr-x---");
+ try {
+ Files.setPosixFilePermissions(basePath, perms);
+ Files.setPosixFilePermissions(statePath, perms);
+ } catch (final IOException e) {
+ log.error("Error changing permissions for the state or base
directory {} ", stateDir.getPath(), e);
+ }
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 93e81c0..5048962 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -56,6 +56,7 @@ import static
org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_F
import static
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.endsWith;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
@@ -126,8 +127,8 @@ public class StateDirectoryTest {
try {
final Set<PosixFilePermission> baseFilePermissions =
Files.getPosixFilePermissions(statePath);
final Set<PosixFilePermission> appFilePermissions =
Files.getPosixFilePermissions(basePath);
- assertThat(expectedPermissions.equals(baseFilePermissions),
is(true));
- assertThat(expectedPermissions.equals(appFilePermissions),
is(true));
+ assertThat(expectedPermissions, equalTo(baseFilePermissions));
+ assertThat(expectedPermissions, equalTo(appFilePermissions));
} catch (final IOException e) {
fail("Should create correct files and set correct permissions");
}
@@ -545,9 +546,13 @@ public class StateDirectoryTest {
@Test
public void shouldNotCreateBaseDirectory() throws IOException {
- initializeStateDirectory(false);
- assertFalse(stateDir.exists());
- assertFalse(appDir.exists());
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+ initializeStateDirectory(false);
+ assertThat(stateDir.exists(), is(false));
+ assertThat(appDir.exists(), is(false));
+ assertThat(appender.getMessages(),
+ not(hasItem(containsString("Error changing permissions for the
state or base directory"))));
+ }
}
@Test