mimaison commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1439616708


##########
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##########
@@ -43,36 +49,36 @@ public class DirectoryConfigProviderTest {
 
     private DirectoryConfigProvider provider;
     private File parent;
-    private File dir;
-    private File bar;
-    private File foo;
-    private File subdir;
-    private File subdirFile;
-    private File siblingDir;
-    private File siblingDirFile;
-    private File siblingFile;
-
-    private static File writeFile(File file) throws IOException {
-        Files.write(file.toPath(), 
file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
-        return file;
+    private String dir;
+    private final String bar = "bar";
+    private final String foo = "foo";
+    private String subdir;
+    private final String subdirFileName = "subdirFile";
+    private String siblingDir;
+    private final String siblingDirFileName = "siblingDirFile";
+    private final String siblingFileName = "siblingFile";
+
+    private static Path writeFile(Path path) throws IOException {
+        return Files.write(path, 
String.valueOf(path.getFileName()).toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
     }
 
     @BeforeEach
     public void setup() throws IOException {
         provider = new DirectoryConfigProvider();
         provider.configure(Collections.emptyMap());
         parent = TestUtils.tempDirectory();
-        dir = new File(parent, "dir");
-        dir.mkdir();
-        foo = writeFile(new File(dir, "foo"));
-        bar = writeFile(new File(dir, "bar"));
-        subdir = new File(dir, "subdir");
-        subdir.mkdir();
-        subdirFile = writeFile(new File(subdir, "subdirFile"));
-        siblingDir = new File(parent, "siblingdir");
-        siblingDir.mkdir();
-        siblingDirFile = writeFile(new File(siblingDir, "siblingdirFile"));
-        siblingFile = writeFile(new File(parent, "siblingFile"));
+        
+        dir = 
String.valueOf(Files.createDirectory(Paths.get(parent.getAbsolutePath(), 
"dir")));

Review Comment:
   I think we can just use `.toString()` here as `Files.createDirectory()` 
should never return `null`.



##########
clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java:
##########
@@ -44,8 +47,23 @@ public class DirectoryConfigProvider implements 
ConfigProvider {
 
     private static final Logger log = 
LoggerFactory.getLogger(DirectoryConfigProvider.class);
 
+    public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+    public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";

Review Comment:
   `Path that this config` -> `A comma separated list of paths that this config`



##########
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##########
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
 
     private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+    public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+    public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";
+    private List<Path> allowedPaths;
+
     public void configure(Map<String, ?> configs) {
+        if (configs.containsKey(ALLOWED_PATHS_CONFIG)) {
+            String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG);
+
+            if (configValue != null && !configValue.isEmpty()) {

Review Comment:
   Should we throw or log something if this field is set to a bad value?



##########
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##########
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
 
     private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+    public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+    public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";
+    private List<Path> allowedPaths;
+
     public void configure(Map<String, ?> configs) {
+        if (configs.containsKey(ALLOWED_PATHS_CONFIG)) {
+            String configValue = (String) configs.get(ALLOWED_PATHS_CONFIG);
+
+            if (configValue != null && !configValue.isEmpty()) {
+                allowedPaths = new ArrayList<>();
+                Arrays.stream(configValue.split(",")).forEach(b -> 
allowedPaths.add(Paths.get(b).normalize()));
+            }
+        } else {
+            allowedPaths = null;

Review Comment:
   Could we initialize the field when it's declared and remove this `else` 
block?



##########
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##########
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
 
     private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+    public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+    public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";

Review Comment:
   Perhaps worth mentioning that if not set all paths are allowed



##########
clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java:
##########
@@ -16,12 +16,15 @@
  */
 package org.apache.kafka.common.config.provider;
 
+import java.nio.file.Paths;

Review Comment:
   Can we put these imports with the other `java.` imports below?
   Same in the other files.



##########
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##########
@@ -40,7 +44,21 @@ public class FileConfigProvider implements ConfigProvider {
 
     private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+    public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+    public static final String ALLOWED_PATHS_DOC = "Path that this config 
provider is allowed to access";

Review Comment:
   `Path that this config` -> `A comma separated list of paths that this config`



##########
clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java:
##########
@@ -83,27 +89,27 @@ public void close() throws IOException {
 
     @Test
     public void testGetAllKeysAtPath() {
-        ConfigData configData = provider.get(dir.getAbsolutePath());

Review Comment:
   `dir` changed type, it's now a String:
   ```
   - private File dir;
   + private String dir;
   ```
   So the `get()` method has not changed, it still accepts a String



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to