Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-02-13 Thread via GitHub


mimaison merged PR #14995:
URL: https://github.com/apache/kafka/pull/14995


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-02-13 Thread via GitHub


mimaison commented on PR #14995:
URL: https://github.com/apache/kafka/pull/14995#issuecomment-1942058460

   None of the test failures seem related, merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-02-08 Thread via GitHub


gharris1727 commented on PR #14995:
URL: https://github.com/apache/kafka/pull/14995#issuecomment-1935143575

   @tinaselenge Could you rebase on trunk? There was a build breakage that only 
affected Scala 2.12 which is resolved now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-02-07 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1481872234


##
clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java:
##
@@ -42,10 +43,12 @@ public void configure(Map configs) {
 }
 this.id = id.toString();
 INSTANCES.put(id.toString(), this);
+
+super.configure(configs);

Review Comment:
   nit: call super at the top of the function, here and in the 
MockVaultConfigProvider.



##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -17,34 +17,56 @@
 package org.apache.kafka.common.config.provider;
 
 import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.kafka.common.config.provider.DirectoryConfigProvider.ALLOWED_PATHS_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class FileConfigProviderTest {
 
 private FileConfigProvider configProvider;
+@TempDir
+private File parent;
+private String dir;
+private String dirFile;
+private String siblingDir;
+private String siblingDirFile;
 
 @BeforeEach
-public void setup() {
+public void setup() throws IOException {
 configProvider = new TestFileConfigProvider();
+configProvider.configure(Collections.emptyMap());
+parent = TestUtils.tempDirectory();

Review Comment:
   Same as DirectoryConfigProviderTest



##
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##
@@ -22,57 +22,67 @@
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Locale;
+import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.StreamSupport;
 
 import static java.util.Arrays.asList;
+
+import static 
org.apache.kafka.common.config.provider.DirectoryConfigProvider.ALLOWED_PATHS_CONFIG;
 import static org.apache.kafka.test.TestUtils.toSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class DirectoryConfigProviderTest {
 
 private DirectoryConfigProvider provider;
+@TempDir
 private File parent;
-private File dir;
-private File bar;
-private File foo;
-private File subdir;
-private File subdirFile;
-private File siblingDir;
-private File siblingDirFile;
-private File siblingFile;
-
-private static File writeFile(File file) throws IOException {
-Files.write(file.toPath(), 
file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
-return file;
+private String dir;
+private final String bar = "bar";
+private final String foo = "foo";
+private String subdir;
+private final String subdirFileName = "subdirFile";
+private String siblingDir;
+private final String siblingDirFileName = "siblingDirFile";
+private final String siblingFileName = "siblingFile";
+
+private static Path writeFile(Path path) throws IOException {
+return Files.write(path, 
String.valueOf(path.getFileName()).toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
 }
 
 @BeforeEach
 public void setup() throws IOException {
 provider = new DirectoryConfigProvider();
 provider.configure(Collections.emptyMap());
+
 parent = TestUtils.tempDirectory();

Review Comment:
   This is unnecessary now with the annotation. `@TempDir` behaves like 
`@Mock`, in that the field is filled out by the test framework before the test 
execution starts.



##
clients/src/test/java/org/apache/kafka/common/config/provider/AllowedPathsTest.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software 

Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-02-06 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1480171669


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = null;

Review Comment:
   @gharris1727 you are right. The mock classes extending the provider class 
needed to be updated as they didn't call `configure()` causing test failures.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-02-05 Thread via GitHub


gharris1727 commented on PR #14995:
URL: https://github.com/apache/kafka/pull/14995#issuecomment-1927875101

   @tinaselenge We can continue with the current design for KIP-993, it 
shouldn't need to change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-02-05 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1478774161


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = null;

Review Comment:
   > We end up breaking tests for AbstractConfig
   
   @tinaselenge Can you clarify what you mean here?
   
   I think the AbstractConfig unconditionally calls configure() here 
immediately after instantiation: 
https://github.com/apache/kafka/blob/24a79c2613e32b1d1615f6f34d63225259740d0e/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L593-L594



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-24 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1465427349


##
clients/src/test/java/org/apache/kafka/common/config/provider/AllowedPathsTest.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.internals.AllowedPaths;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class AllowedPathsTest {
+
+private AllowedPaths allowedPaths;
+private String dir;
+private String myFile;
+private String dir2;
+
+@BeforeEach
+public void setup() throws IOException {
+File parent = TestUtils.tempDirectory();

Review Comment:
   nit: Make this a `@TempDir` annotated field so that it automatically gets 
cleaned up.
   https://junit.org/junit5/docs/5.4.1/api/org/junit/jupiter/api/io/TempDir.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-24 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1464770354


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = null;

Review Comment:
   There is something else to consider if we go down the route of throwing an 
exception(either NPE or IllegalStateException) when configure() is not called 
first.  We end up breaking tests for AbstractConfig, which is used to resolve 
the config variables, by instantiating ConfigProviders with the given 
configProvider configs and then calling get() method 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L536).
 We would need to make changes to this class, possibly call configure() first 
when instantiating the CPs which may or may not have further impact somewhere 
else. 
   
   Considering this, I'm not sure what's more valuable between defaulting to 
the previous behaviour of no-op and introducing an exception. What do you both 
think? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-22 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1462221718


##
clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class AllowedPaths {
+private final List allowedPaths;
+
+/**
+ * Constructs AllowedPaths with a list of Paths retrieved from {@code 
configValue}.
+ * @param configValue {@code allowed.paths} config value which is a string 
containing comma separated list of paths
+ * @throws ConfigException if any of the given paths is not absolute or 
does not exist.
+ */
+public AllowedPaths(String configValue) {
+this.allowedPaths = getAllowedPaths(configValue);
+}
+
+private List getAllowedPaths(String configValue) {
+if (configValue != null && !configValue.isEmpty()) {
+List allowedPaths = new ArrayList<>();
+
+Arrays.stream(configValue.split(",")).forEach(b -> {
+Path normalisedPath = Paths.get(b).normalize();
+
+if (normalisedPath.isAbsolute() && 
Files.exists(normalisedPath)) {
+allowedPaths.add(normalisedPath);
+} else {
+throw new ConfigException("Path " + normalisedPath + " is 
not valid. The path should be absolute and exist");

Review Comment:
   Can you split this into two separate errors, so that the user knows which 
condition failed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-22 Thread via GitHub


mimaison commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1461985813


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = null;

Review Comment:
   As with every `Configurable` interfaces, the implicit contract is that 
`configure()` will be called before any of the other methods. I expect many 
implementations would throw NPE if this wasn't the case. Searching 2 minutes I 
found that it's the case in `JsonConverter` for example. I'm also not sure how 
one would prevent `configure()` from being called. So I think it's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-16 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1453854211


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = null;

Review Comment:
   @mimaison WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-16 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1453851791


##
clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java:
##
@@ -44,8 +44,15 @@ public class DirectoryConfigProvider implements 
ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(DirectoryConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = new AllowedPaths(null);

Review Comment:
   This class is required to be thread-safe in the ConfigProvider javadoc.
   
   ```suggestion
   private volatile AllowedPaths allowedPaths = new AllowedPaths(null);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-16 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1453850678


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = null;

Review Comment:
   > Could there be users who don't call configure first necessarily? I'm 
worried that throwing IllegalStateException could cause backward compatibility 
issue.
   
   Throwing IllegalStateException would be a backwards incompatible change, but 
I think it's not one that was supported in the first place. Not calling 
configure() on these particular implementations because they previously 
happened to be no-ops is coupling too closely on the internal implementation of 
these classes.
   
   It is a value judgement, and we have to determine which is more valuable. 
For the record, I think that exploiting the interface by preventing calls to 
configure() is unlikely, I was just thinking about defense in depth.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-16 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1453337557


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = null;

Review Comment:
   This is a good point! As suggested in the comments below, we can make the 
constructor take in a String for the configValue and set the allowList. And 
here we can set it to `new AllowedPaths(null)` so that we avoid NPE and it 
always defaults to the previous behaviour allowing all paths. 
   
   Could there be users who don't call configure first necessarily? I'm worried 
that throwing IllegalStateException could cause backward compatibility issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-10 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1448064344


##
clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class AllowedPaths {
+private List allowedPaths;
+
+private AllowedPaths(List allowedPaths) {
+this.allowedPaths = allowedPaths;
+}
+
+/**
+ * Constructs AllowedPaths with a list of Paths retrieved from {@code 
configValue}.
+ * @param configValue {@code allowed.paths} config value which is a string 
containing comma separated list of paths
+ * @return AllowedPaths with a list of Paths or null list if the {@code 
configValue} is null or empty string.
+ */
+public static AllowedPaths configureAllowedPaths(String configValue) {
+if (configValue != null && !configValue.isEmpty()) {
+List allowedPaths = new ArrayList<>();
+
+Arrays.stream(configValue.split(",")).forEach(b -> {
+Path normalisedPath = Paths.get(b).normalize();
+
+if (normalisedPath.isAbsolute() && 
Files.exists(normalisedPath)) {
+allowedPaths.add(normalisedPath);
+} else {
+throw new ConfigException("Path " + normalisedPath + " is 
not valid. The path should be absolute and exist");
+}
+});
+
+return new AllowedPaths(allowedPaths);
+}
+
+return new AllowedPaths(null);
+}
+
+/**
+ * Checks if the given {@code path} resides in the configured {@code 
allowed.paths}.
+ * If {@code allowed.paths} is not configured, the given Path is returned 
as allowed.
+ * @param path the Path to check if allowed
+ * @return Path that can be accessed or null if the given Path does not 
reside in the configured {@code allowed.paths}.
+ */
+public Path getIfPathIsAllowed(Path path) {

Review Comment:
   This name sounds like it should return a boolean. Also I noticed that the 
call-sites duplicate the Path.get().
   
   ```suggestion
   public Path parseUntrustedPath(String untrustedPath) {
   ```



##
clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class AllowedPaths {
+private List allowedPaths;
+
+private AllowedPaths(List allowedPaths) {
+this.allowedPaths = allowedPaths;
+}
+
+/**
+ * Constructs AllowedPaths with a list of Paths retrieved from {@code 
configValue}.
+ * @param configValue {@code allowed.paths} config value which is a string 
containing comma separated list of paths
+ * @return AllowedPaths with a list of Paths or null list if the {@code 
configValue} is null or empty string.

Review Comment:
   

Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-09 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1446011456


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dirFile + 
Paths.get("/../siblingdir/siblingdirFile"));

Review Comment:
   I think that makes sense. I have taken the suggestion and addressed in the 
latest commit, please let me know if I missed anything. Thank you. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-09 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1445993327


##
clients/src/main/java/org/apache/kafka/common/config/internals/ConfigProviderUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ConfigProviderUtils {
+
+/**
+ * Reads the given {@code configValue} and creates a list of paths.
+ * @param configValue allowed.paths config value which is a string 
containing comma separated list of paths
+ * @return List of paths or null if the {@code configValue} is null or 
empty string.
+ */
+public static List configureAllowedPaths(String configValue) {
+if (configValue != null && !configValue.isEmpty()) {
+List allowedPaths = new ArrayList<>();
+Arrays.stream(configValue.split(",")).forEach(b -> 
allowedPaths.add(Paths.get(b).normalize()));

Review Comment:
   I think it makes sense to ensure that paths are absolute and exist. I have 
taken the suggestion in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-04 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1441959148


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dirFile + 
Paths.get("/../siblingdir/siblingdirFile"));

Review Comment:
   > I did however deduplicate the implementation classes by adding 
ConfigProviderUtils class with the common methods.
   
   Rather than making these static utilities and have the caller store the 
List, you can keep the List as an instance variable in the shared 
class. This ensures that the List is handled completely by the validation 
logic and not changed by the config provider implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-04 Thread via GitHub


mimaison commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1441931015


##
clients/src/main/java/org/apache/kafka/common/config/internals/ConfigProviderUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ConfigProviderUtils {
+
+/**
+ * Reads the given {@code configValue} and creates a list of paths.
+ * @param configValue allowed.paths config value which is a string 
containing comma separated list of paths
+ * @return List of paths or null if the {@code configValue} is null or 
empty string.
+ */
+public static List configureAllowedPaths(String configValue) {
+if (configValue != null && !configValue.isEmpty()) {
+List allowedPaths = new ArrayList<>();
+Arrays.stream(configValue.split(",")).forEach(b -> 
allowedPaths.add(Paths.get(b).normalize()));

Review Comment:
   You still need to normalize them to remove `.` and `..` in the middle of the 
path (for example `/tmp/../path`) but I think it would be preferable to ensure 
allowed paths are absolute. Should we also ensure they exist or are there use 
cases where the paths might be created later?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-04 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1441652042


##
clients/src/main/java/org/apache/kafka/common/config/internals/ConfigProviderUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ConfigProviderUtils {
+
+/**
+ * Reads the given {@code configValue} and creates a list of paths.
+ * @param configValue allowed.paths config value which is a string 
containing comma separated list of paths
+ * @return List of paths or null if the {@code configValue} is null or 
empty string.
+ */
+public static List configureAllowedPaths(String configValue) {
+if (configValue != null && !configValue.isEmpty()) {
+List allowedPaths = new ArrayList<>();
+Arrays.stream(configValue.split(",")).forEach(b -> 
allowedPaths.add(Paths.get(b).normalize()));

Review Comment:
   So instead of normalising the path, we don't allow any traversal? Should we 
return an exception in that case? Or if some paths contain `..` but some don't, 
do we only configure the ones that are absolute or only configure if all paths 
are absolute? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-04 Thread via GitHub


mimaison commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1441533761


##
clients/src/main/java/org/apache/kafka/common/config/internals/ConfigProviderUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ConfigProviderUtils {
+
+/**
+ * Reads the given {@code configValue} and creates a list of paths.
+ * @param configValue allowed.paths config value which is a string 
containing comma separated list of paths
+ * @return List of paths or null if the {@code configValue} is null or 
empty string.
+ */
+public static List configureAllowedPaths(String configValue) {
+if (configValue != null && !configValue.isEmpty()) {
+List allowedPaths = new ArrayList<>();
+Arrays.stream(configValue.split(",")).forEach(b -> 
allowedPaths.add(Paths.get(b).normalize()));

Review Comment:
   I wonder if we should also enforce each path is absolute too. This would 
avoid users configuring the allowed paths with `../path` for example. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-03 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1441077572


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dirFile + 
Paths.get("/../siblingdir/siblingdirFile"));

Review Comment:
   I have also added tests for ConfigProviderUtils, so I guess we could remove 
the duplicated traversal tests but wonder if we should still keep the allowed 
paths tests in the these classes as they test the full flow. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-03 Thread via GitHub


tinaselenge commented on PR #14995:
URL: https://github.com/apache/kafka/pull/14995#issuecomment-1876030053

   > Thanks for the PR. I made a second quick pass and left a few more comments.
   
   @mimaison Thank you very much for reviewing the PR again. I think I 
addressed your comments except the one I had a question on. Please let me know 
if I missed anything. Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-03 Thread via GitHub


tinaselenge commented on PR #14995:
URL: https://github.com/apache/kafka/pull/14995#issuecomment-1876028036

   > I came up with another path traversal, but this one is less severe.
   > 
   > If you try to get `/arbitrary/path/../..//dir/dirFile` you can 
perform "exists" and "isDirectory" checks for `/arbitrary/path` depending on 
the error returned.
   > 
   > (edit): This attack isn't reproducible in the tests, because they mock out 
`reader(Path)` 
   > 
   > This would allow an attacker to map out the whole filesystem and 
permissions of the current process. This could inform another attack, such as: 
finding vulnerable software, finding user accounts, finding active processes, 
etc.
   > 
   > This attack works because the normalized form is used in pathIsAllowed(), 
but the non-normalized form is used in the actual read. I think we need to make 
it so that the normalized path is used instead of the given path.
   > 
   > I noticed this caveat in the documentation for Path.normalize:
   > 
   > > This method does not access the file system; the path may not locate a 
file that exists. Eliminating ".." and a preceding name from a path may result 
in the path that locates a different file than the original path. This can 
arise when the preceding name is a symbolic link.
   > 
   > I think this would cause a backwards-incompatible change if we were to 
apply it in the default case, so we should only use the normalized form when 
`allowed.paths` is specified.
   > 
   > I tried and failed to come up with a arbitrary file-read attack using the 
symlink behavior. If the symlink is within the allowed-paths (and possibly lets 
the user escape the allowed-paths) it should probably work normally. If the 
symlink is outside of the allowed paths, we can exploit it with the same 
prefixing attack from above, but the allowed part of the path prevents actually 
reading arbitrary files. Regardless, I think that using the normalized form for 
file accesses prevents accessing external symlinks completely.
   > 
   > Thanks so much for working on this feature!
   
   
   Thank you very much @gharris1727 for the detailed comment on this issue. 
These are really good points and I agree, I hadn't put enough thoughts into 
this. 
   
   I took the suggestions and updated it to use the normalisedPath in the 
actual read when `allowed.paths` is specified. Please let me know what you 
think. 
   
   I also tried addressing your other comments. Thank you for reviewing the PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-03 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1440996476


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dirFile + 
Paths.get("/../siblingdir/siblingdirFile"));

Review Comment:
   good point! I fixed the path logic, however I'm not sure about deduplicating 
the test classes as they are set up slightly different, one working with 
directory and the one working with files. 
   
   I did however deduplicate the implementation classes by adding 
ConfigProviderUtils class with the common methods. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-03 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1440994447


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";
+private List allowedPaths;
+
 public void configure(Map configs) {
+if (configs.containsKey(ALLOWED_PATHS_CONFIG)) {
+String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG);
+
+if (configValue != null && !configValue.isEmpty()) {

Review Comment:
   If the user configure the config with null or empty value, should that be 
considered as a bad value or should that be considered same as not setting it, 
therefore allowing access to all paths? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-02 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1439665912


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dirFile + 
Paths.get("/../siblingdir/siblingdirFile"));

Review Comment:
   This is still an invalid path traversal attack. The duplicate test in 
DirectoryConfigProviderTest appears to be fine.
   
   Have you looked into deduplicating these two classes? That would also avoid 
requiring two different tests for the same traversal attack.



##
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##
@@ -153,5 +159,57 @@ public void testServiceLoaderDiscovery() {
 ServiceLoader serviceLoader = 
ServiceLoader.load(ConfigProvider.class);
 assertTrue(StreamSupport.stream(serviceLoader.spliterator(), 
false).anyMatch(configProvider -> configProvider instanceof 
DirectoryConfigProvider));
 }
+
+@Test
+public void testAllowedPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, parent.getAbsolutePath());
+provider.configure(configs);
+
+ConfigData configData = provider.get(dir);
+assertEquals(toSet(asList(foo, bar)), configData.data().keySet());
+assertEquals("FOO", configData.data().get(foo));
+assertEquals("BAR", configData.data().get(bar));
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+provider.configure(configs);
+
+ConfigData configData = provider.get(subdir);
+assertEquals(toSet(asList(subdirFileName)), 

Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-02 Thread via GitHub


mimaison commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1439616708


##
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##
@@ -43,36 +49,36 @@ public class DirectoryConfigProviderTest {
 
 private DirectoryConfigProvider provider;
 private File parent;
-private File dir;
-private File bar;
-private File foo;
-private File subdir;
-private File subdirFile;
-private File siblingDir;
-private File siblingDirFile;
-private File siblingFile;
-
-private static File writeFile(File file) throws IOException {
-Files.write(file.toPath(), 
file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
-return file;
+private String dir;
+private final String bar = "bar";
+private final String foo = "foo";
+private String subdir;
+private final String subdirFileName = "subdirFile";
+private String siblingDir;
+private final String siblingDirFileName = "siblingDirFile";
+private final String siblingFileName = "siblingFile";
+
+private static Path writeFile(Path path) throws IOException {
+return Files.write(path, 
String.valueOf(path.getFileName()).toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
 }
 
 @BeforeEach
 public void setup() throws IOException {
 provider = new DirectoryConfigProvider();
 provider.configure(Collections.emptyMap());
 parent = TestUtils.tempDirectory();
-dir = new File(parent, "dir");
-dir.mkdir();
-foo = writeFile(new File(dir, "foo"));
-bar = writeFile(new File(dir, "bar"));
-subdir = new File(dir, "subdir");
-subdir.mkdir();
-subdirFile = writeFile(new File(subdir, "subdirFile"));
-siblingDir = new File(parent, "siblingdir");
-siblingDir.mkdir();
-siblingDirFile = writeFile(new File(siblingDir, "siblingdirFile"));
-siblingFile = writeFile(new File(parent, "siblingFile"));
+
+dir = 
String.valueOf(Files.createDirectory(Paths.get(parent.getAbsolutePath(), 
"dir")));

Review Comment:
   I think we can just use `.toString()` here as `Files.createDirectory()` 
should never return `null`.



##
clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java:
##
@@ -44,8 +47,23 @@ public class DirectoryConfigProvider implements 
ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(DirectoryConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";

Review Comment:
   `Path that this config` -> `A comma separated list of paths that this config`



##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";
+private List allowedPaths;
+
 public void configure(Map configs) {
+if (configs.containsKey(ALLOWED_PATHS_CONFIG)) {
+String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG);
+
+if (configValue != null && !configValue.isEmpty()) {

Review Comment:
   Should we throw or log something if this field is set to a bad value?



##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";
+private List allowedPaths;
+
 public void configure(Map configs) {
+if (configs.containsKey(ALLOWED_PATHS_CONFIG)) {
+String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG);
+
+if (configValue != null && !configValue.isEmpty()) {
+allowedPaths = new ArrayList<>();
+Arrays.stream(configValue.split(",")).forEach(b -> 
allowedPaths.add(Paths.get(b).normalize()));
+}
+} else {
+allowedPaths = null;

Review Comment:
   Could we initialize the field when it's declared and remove this `else` 
block?



##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {

Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-02 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1439373207


##
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##
@@ -83,27 +89,27 @@ public void close() throws IOException {
 
 @Test
 public void testGetAllKeysAtPath() {
-ConfigData configData = provider.get(dir.getAbsolutePath());

Review Comment:
   This was resulted by using `java.nio.file.Files` instead of `File` therefore 
no longer can call getAbsolutePath() to pass in a string argument. The test 
variables types are changed to String, to avoid having to do string 
conversation everywhere. I didn't think changing test variable types would 
break the compatibility, since the we are still passing the same type of 
variable to the calling method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-02 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1439364385


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dir + 
"../siblingdir/siblingdirFile");

Review Comment:
   good catch! I have fixed these now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2023-12-26 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1436601837


##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dir + 
"../siblingdir/siblingdirFile");

Review Comment:
   I believe this test is invalid.
   The directory structure looks like this:
   ```
   
   dir
   dirFile
   siblingdir
   siblingdirFile
   ```
   
   This test sets up the allowed paths to be `/dir/dirFile` and 
attempts to access `/dir../siblingDir/siblingDirFile`
   
   This doesn't read any data, but only due to typos in the test:
   1. `dir..` isn't a valid directory. This can be fixed by using Paths.get() 
to perform the path extension or by inserting another `/`
   2. `/dir/../siblingDir/siblingDirFile` is a path traversal attack on 
`/dir/` and not on `/dir/dirFile`.
   
   If I fix both of these typos, the test fails because the path traversal 
attack succeeds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2023-12-26 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1436602326


##
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##
@@ -83,27 +89,27 @@ public void close() throws IOException {
 
 @Test
 public void testGetAllKeysAtPath() {
-ConfigData configData = provider.get(dir.getAbsolutePath());

Review Comment:
   What is the importance of these changes to the existing tests? Do existing 
tests fail without changing the inputs? Is this a backwards incompatibility?



##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -98,8 +117,91 @@ public void testServiceLoaderDiscovery() {
 public static class TestFileConfigProvider extends FileConfigProvider {
 
 @Override
-protected Reader reader(String path) throws IOException {
+protected Reader reader(Path path) throws IOException {
 return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
 }
 }
+
+@Test
+public void testAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testAllowedFilePath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(dirFile);
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testMultipleAllowedPaths() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir + "," + siblingDir);
+configProvider.configure(configs);
+
+Map result = new HashMap<>();
+result.put("testKey", "testResult");
+result.put("testKey2", "testResult2");
+
+ConfigData configData = configProvider.get(dirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+
+configData = configProvider.get(siblingDirFile);
+assertEquals(result, configData.data());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedDirPath() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dir);
+configProvider.configure(configs);
+
+ConfigData configData = configProvider.get(siblingDirFile);
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNotAllowedFilePath() throws IOException {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+//another file under the same directory
+Path dirFile2 = Files.createFile(Paths.get(dir, "dirFile2"));
+ConfigData configData = configProvider.get(dirFile2.toString());
+assertTrue(configData.data().isEmpty());
+assertNull(configData.ttl());
+}
+
+@Test
+public void testNoTraversal() {
+Map configs = new HashMap<>();
+configs.put(ALLOWED_PATHS_CONFIG, dirFile);
+configProvider.configure(configs);
+
+// Check we can't escape outside the path directory
+ConfigData configData = configProvider.get(dir + 
"../siblingdir/siblingdirFile");

Review Comment:
   I believe this test is invalid.
   The directory structure looks like this:
   ```
   
   dir
   dirFile
   siblingdir
   siblingdirFile
   ```
   
   This test sets up the allowed paths to be `/dir/dirFile` and 
attempts to access `/dir../siblingDir/siblingDirFile`
   
   This doesn't read any data, but only due to typos in the test:
   1. `dir..` isn't a valid directory. This can be fixed by using Paths.get() 
to perform the path extension or by inserting another `/`
   2. `/dir/../siblingDirFile` is a path traversal attack on 
`/dir/` and not on `/dir/dirFile`.
   
   If I fix both of these typos, the test fails because the path traversal 
attack succeeds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2023-12-19 Thread via GitHub


mimaison commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1431221328


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -84,7 +112,17 @@ public ConfigData get(String path, Set keys) {
 if (path == null || path.isEmpty()) {
 return new ConfigData(data);
 }
-try (Reader reader = reader(path)) {
+
+Path filePath = new File(path).toPath();

Review Comment:
   Can we use `Paths.get()` instead of creating a `File`?



##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -84,7 +112,17 @@ public ConfigData get(String path, Set keys) {
 if (path == null || path.isEmpty()) {
 return new ConfigData(data);
 }
-try (Reader reader = reader(path)) {
+
+Path filePath = new File(path).toPath();

Review Comment:
   Same if a couple of other places



##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -84,7 +112,17 @@ public ConfigData get(String path, Set keys) {
 if (path == null || path.isEmpty()) {
 return new ConfigData(data);
 }
-try (Reader reader = reader(path)) {
+
+Path filePath = new File(path).toPath();
+if (allowedPaths != null) {
+long allowed = allowedPaths.stream().filter(allowedPath -> 
filePath.startsWith(allowedPath) || filePath.equals(allowedPath)).count();

Review Comment:
   Do we need the `equals()` check? I think `startsWith()`  return` `true` is 
both paths are equals.



##
clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java:
##
@@ -29,22 +32,36 @@
 import java.util.ServiceLoader;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.kafka.common.config.provider.DirectoryConfigProvider.ALLOWED_PATHS_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class FileConfigProviderTest {
 
 private FileConfigProvider configProvider;
+private File parent;
+private File dir;
+private File dirFile;
+private File siblingDir;
+private File siblingDirFile;
 
 @BeforeEach
 public void setup() {
 configProvider = new TestFileConfigProvider();
+configProvider.configure(Collections.emptyMap());
+parent = TestUtils.tempDirectory();
+dir = new File(parent, "dir");

Review Comment:
   Can we use `java.nio.file.Files` instead of `File`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2023-12-12 Thread via GitHub


tinaselenge opened a new pull request, #14995:
URL: https://github.com/apache/kafka/pull/14995

   This PR implements 
[KIP-993](https://cwiki.apache.org/confluence/display/KAFKA/KIP-993%3A+Allow+restricting+files+accessed+by+File+and+Directory+ConfigProviders)
 for restricting files accessed by File and Directory ConfigProviders. 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org