dlmarion commented on code in PR #35:
URL: 
https://github.com/apache/accumulo-classloaders/pull/35#discussion_r2676988070


##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactory.java:
##########
@@ -206,70 +121,168 @@ void resetForTests() {
 
   @Override
   public void init(ContextClassLoaderEnvironment env) {
-    baseCacheDir = 
requireNonNull(env.getConfiguration().get(Constants.CACHE_DIR_PROPERTY),
+    String value = 
requireNonNull(env.getConfiguration().get(Constants.CACHE_DIR_PROPERTY),
         "Property " + Constants.CACHE_DIR_PROPERTY + " not set, cannot create 
cache directory.");
     String graceProp = 
env.getConfiguration().get(Constants.UPDATE_FAILURE_GRACE_PERIOD_MINS);
     long graceMins = graceProp == null ? 0 : Long.parseLong(graceProp);
     updateFailureGracePeriodMins = Duration.ofMinutes(graceMins);
+    final Path baseCacheDir;
+    if (value.startsWith("file:")) {
+      try {
+        baseCacheDir = Path.of(new URL(value).toURI());
+      } catch (IOException | URISyntaxException e) {
+        throw new IllegalArgumentException(
+            "Malformed file: URL specified for base directory: " + value, e);
+      }
+    } else if (value.startsWith("/")) {
+      baseCacheDir = Path.of(value);
+    } else {
+      throw new IllegalArgumentException(
+          "Base directory is neither a file URL nor an absolute file path: " + 
value);
+    }
     try {
-      CacheUtils.createBaseCacheDir(baseCacheDir);
-    } catch (IOException | ContextClassLoaderException e) {
-      throw new IllegalStateException("Error creating base cache directory at 
" + baseCacheDir, e);
+      localStore.set(new LocalStore(baseCacheDir));
+    } catch (IOException e) {
+      throw new UncheckedIOException("Unable to create the local storage area 
at " + baseCacheDir,
+          e);
     }
   }
 
-  ConcurrentHashMap<String,ContextDefinition> contextDefs = new 
ConcurrentHashMap<>();
-
   @Override
   public ClassLoader getClassLoader(final String contextLocation)
       throws ContextClassLoaderException {
-    Preconditions.checkState(baseCacheDir != null, "init not called before 
calling getClassLoader");
-    requireNonNull(contextLocation, "context name must be supplied");
-    final AtomicBoolean newlyCreated = new AtomicBoolean(false);
-    final AtomicReference<URLClassLoader> cl = new AtomicReference<>();
-    ContextDefinition def;
+    Preconditions.checkState(localStore.get() != null,
+        "init not called before calling getClassLoader");
+    requireNonNull(contextLocation, "context location must be supplied");
+    final var classloader = new AtomicReference<URLClassLoader>();
     try {
-      def = contextDefs.compute(contextLocation, (k, v) -> {
-        ContextDefinition def2;
-        if (v == null) {
-          newlyCreated.set(true);
+      // get the current definition, or create it from the location if it 
doesn't exist; this has
+      // the side effect of creating and caching a URLClassLoader instance if 
it doesn't exist for
+      // the computed definition
+      contextDefs.compute(contextLocation,
+          (contextLocationKey, previousDefinition) -> 
computeDefinitionAndClassLoader(classloader,
+              contextLocationKey, previousDefinition));
+    } catch (RuntimeException e) {
+      throw new ContextClassLoaderException(e.getMessage(), e);
+    }
+    return classloader.get();
+  }
+
+  private ContextDefinition computeDefinitionAndClassLoader(
+      AtomicReference<URLClassLoader> resultHolder, String contextLocation,
+      ContextDefinition previousDefinition) {
+    ContextDefinition computedDefinition;
+    if (previousDefinition == null) {
+      try {
+        URL url = new URL(contextLocation);
+        computedDefinition = ContextDefinition.fromRemoteURL(url);
+        monitorContext(contextLocation, 
computedDefinition.getMonitorIntervalSeconds());

Review Comment:
   This will schedule a task to monitor the context definition for changes 
before we know that we can download all of the resources and create a valid 
ClassLoader for it. 



##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactory.java:
##########
@@ -206,70 +121,168 @@ void resetForTests() {
 
   @Override
   public void init(ContextClassLoaderEnvironment env) {
-    baseCacheDir = 
requireNonNull(env.getConfiguration().get(Constants.CACHE_DIR_PROPERTY),
+    String value = 
requireNonNull(env.getConfiguration().get(Constants.CACHE_DIR_PROPERTY),
         "Property " + Constants.CACHE_DIR_PROPERTY + " not set, cannot create 
cache directory.");
     String graceProp = 
env.getConfiguration().get(Constants.UPDATE_FAILURE_GRACE_PERIOD_MINS);
     long graceMins = graceProp == null ? 0 : Long.parseLong(graceProp);
     updateFailureGracePeriodMins = Duration.ofMinutes(graceMins);
+    final Path baseCacheDir;
+    if (value.startsWith("file:")) {
+      try {
+        baseCacheDir = Path.of(new URL(value).toURI());
+      } catch (IOException | URISyntaxException e) {
+        throw new IllegalArgumentException(
+            "Malformed file: URL specified for base directory: " + value, e);
+      }
+    } else if (value.startsWith("/")) {
+      baseCacheDir = Path.of(value);
+    } else {
+      throw new IllegalArgumentException(
+          "Base directory is neither a file URL nor an absolute file path: " + 
value);
+    }
     try {

Review Comment:
   I think we should validate that `baseCacheDir` exists at this point, and 
fail if not. If LocalStore is modified in the future such that the directory 
creation is moved out of the constructor, then this will not fail in the same 
way.



##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.classloader.lcc.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.classloader.lcc.Constants;
+import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
+import org.apache.accumulo.classloader.lcc.definition.Resource;
+import org.apache.accumulo.classloader.lcc.resolvers.FileResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple storage service backed by a local file system for storing 
downloaded
+ * {@link ContextDefinition} files and the {@link Resource} objects it 
references.
+ * <p>
+ * The layout of the storage area consists of two directories:
+ * <ul>
+ * <li><b>contexts</b> stores a copy of the {@link ContextDefinition} JSON 
files for each context,
+ * and exist primarily for user convenience (they aren't used again by this 
factory)
+ * <li><b>resources</b> stores a copy of all the {@link Resource} files for 
all contexts
+ * </ul>
+ *
+ * <p>
+ * When downloading any file, the file is first downloaded to a temporary file 
with a unique name,
+ * and then atomically renamed, so it works correctly even if there are 
multiple processes or
+ * threads doing the same thing. The use of `CREATE_NEW` and the lack of 
`REPLACE_EXISTING` when
+ * creating the temporary files is to ensure we do not silently collide with 
other processes, when
+ * these temporary files should be unique.
+ *
+ * <p>
+ * When downloading resource files, an additional "in-progress" signal file 
derived from the name of
+ * the file being downloaded with the suffix ".downloading" is also used. This 
file is updated with
+ * the current process' PID every 5 seconds so long as the file is still being 
downloaded. This
+ * serves as a signal to other processes or threads that they can wait instead 
of attempting to
+ * download the same file. This avoids duplicate work. If an attempt to 
download a resource file is
+ * detected, and it has been updated within the last 30 seconds, it is skipped 
and the remaining
+ * files are downloaded before attempting it again. If this signal file hasn't 
been updated in the
+ * last 30 seconds, a download will be attempted. Failures to download are 
propagated up to the
+ * higher level. `CREATE_NEW` is used when creating this signal file, in order 
to atomically detect
+ * race collisions with other threads or processes, and to try to avoid 
duplicate work.
+ *
+ * <p>
+ * Once all files for a context are downloaded, an array of {@link URL}s will 
be returned, which
+ * point to the local files that have been downloaded for the context, in the 
same order as
+ * specified in the {@link ContextDefinition} file, to be used for 
constructing a
+ * {@link URLClassLoader}.
+ */
+public final class LocalStore {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalStore.class);
+  private static final String PID = 
Long.toString(ProcessHandle.current().pid());
+
+  private final Path contextsDir;
+  private final Path resourcesDir;
+
+  public LocalStore(final Path baseDir) throws IOException {
+    this.contextsDir = 
requireNonNull(baseDir).toAbsolutePath().resolve("contexts");
+    this.resourcesDir = baseDir.resolve("resources");
+    Files.createDirectories(contextsDir);
+    Files.createDirectories(resourcesDir);
+  }
+
+  Path contextsDir() {
+    return contextsDir;
+  }
+
+  Path resourcesDir() {
+    return resourcesDir;
+  }
+
+  // pattern to match regular files that have at least one non-dot character 
preceding a dot and a
+  // non-zero suffix; these files can be easily converted so the local store 
retains the original
+  // file name extension, while non-matching files will not attempt to retain 
the original file name
+  // extension, and will instead just append the checksum to the original file 
name
+  private static Pattern fileNamesWithExtensionPattern = 
Pattern.compile("^(.*[^.].*)[.]([^.]+)$");
+
+  static String localName(String remoteFileName, String checksum) {
+    requireNonNull(remoteFileName);
+    requireNonNull(checksum);
+    var matcher = fileNamesWithExtensionPattern.matcher(remoteFileName);
+    if (matcher.matches()) {
+      return String.format("%s-%s.%s", matcher.group(1), checksum, 
matcher.group(2));
+    }
+    return String.format("%s-%s", remoteFileName, checksum);
+  }
+
+  private static String tempName(String baseName) {
+    return "." + requireNonNull(baseName) + "_PID" + PID + "_" + 
UUID.randomUUID() + ".tmp";
+  }
+
+  public URL[] storeContextResources(final ContextDefinition 
contextDefinition) throws IOException {
+    requireNonNull(contextDefinition, "definition must be supplied");
+    // use a LinkedHashSet to preserve the order of the context resources
+    final Set<Path> localFiles = new LinkedHashSet<>();
+    // store it with a .json suffix, if the original file didn't have one
+    final String origSourceName = contextDefinition.getSourceFileName();
+    final String sourceNameWithSuffix =
+        origSourceName.toLowerCase().endsWith(".json") ? origSourceName : 
origSourceName + ".json";
+    final String destinationName = localName(sourceNameWithSuffix, 
contextDefinition.getChecksum());
+    try {
+      storeContextDefinition(contextDefinition, destinationName);
+      boolean successful = false;
+      while (!successful) {
+        localFiles.clear();
+        for (Resource resource : contextDefinition.getResources()) {
+          Path path = storeResource(resource);
+          if (path == null) {
+            LOG.debug("Skipped resource {} while another process or thread is 
downloading it",
+                resource.getLocation());
+            continue;
+          }
+          localFiles.add(path);
+          LOG.trace("Added resource {} to classpath", path);
+        }
+        successful = localFiles.size() == 
contextDefinition.getResources().size();
+      }
+
+    } catch (IOException | RuntimeException e) {
+      LOG.error("Error initializing context: " + destinationName, e);

Review Comment:
   ```suggestion
         LOG.error("Error storing resources for context: {}", 
contextDefinition.getSourceFileName(), e);
   ```



##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.classloader.lcc.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.classloader.lcc.Constants;
+import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
+import org.apache.accumulo.classloader.lcc.definition.Resource;
+import org.apache.accumulo.classloader.lcc.resolvers.FileResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple storage service backed by a local file system for storing 
downloaded
+ * {@link ContextDefinition} files and the {@link Resource} objects it 
references.
+ * <p>
+ * The layout of the storage area consists of two directories:
+ * <ul>
+ * <li><b>contexts</b> stores a copy of the {@link ContextDefinition} JSON 
files for each context,
+ * and exist primarily for user convenience (they aren't used again by this 
factory)
+ * <li><b>resources</b> stores a copy of all the {@link Resource} files for 
all contexts
+ * </ul>
+ *
+ * <p>
+ * When downloading any file, the file is first downloaded to a temporary file 
with a unique name,
+ * and then atomically renamed, so it works correctly even if there are 
multiple processes or
+ * threads doing the same thing. The use of `CREATE_NEW` and the lack of 
`REPLACE_EXISTING` when
+ * creating the temporary files is to ensure we do not silently collide with 
other processes, when
+ * these temporary files should be unique.
+ *
+ * <p>
+ * When downloading resource files, an additional "in-progress" signal file 
derived from the name of
+ * the file being downloaded with the suffix ".downloading" is also used. This 
file is updated with
+ * the current process' PID every 5 seconds so long as the file is still being 
downloaded. This
+ * serves as a signal to other processes or threads that they can wait instead 
of attempting to
+ * download the same file. This avoids duplicate work. If an attempt to 
download a resource file is
+ * detected, and it has been updated within the last 30 seconds, it is skipped 
and the remaining
+ * files are downloaded before attempting it again. If this signal file hasn't 
been updated in the
+ * last 30 seconds, a download will be attempted. Failures to download are 
propagated up to the
+ * higher level. `CREATE_NEW` is used when creating this signal file, in order 
to atomically detect
+ * race collisions with other threads or processes, and to try to avoid 
duplicate work.
+ *
+ * <p>
+ * Once all files for a context are downloaded, an array of {@link URL}s will 
be returned, which
+ * point to the local files that have been downloaded for the context, in the 
same order as
+ * specified in the {@link ContextDefinition} file, to be used for 
constructing a
+ * {@link URLClassLoader}.
+ */
+public final class LocalStore {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalStore.class);
+  private static final String PID = 
Long.toString(ProcessHandle.current().pid());
+
+  private final Path contextsDir;
+  private final Path resourcesDir;
+
+  public LocalStore(final Path baseDir) throws IOException {
+    this.contextsDir = 
requireNonNull(baseDir).toAbsolutePath().resolve("contexts");
+    this.resourcesDir = baseDir.resolve("resources");
+    Files.createDirectories(contextsDir);
+    Files.createDirectories(resourcesDir);
+  }
+
+  Path contextsDir() {
+    return contextsDir;
+  }
+
+  Path resourcesDir() {
+    return resourcesDir;
+  }
+
+  // pattern to match regular files that have at least one non-dot character 
preceding a dot and a
+  // non-zero suffix; these files can be easily converted so the local store 
retains the original
+  // file name extension, while non-matching files will not attempt to retain 
the original file name
+  // extension, and will instead just append the checksum to the original file 
name
+  private static Pattern fileNamesWithExtensionPattern = 
Pattern.compile("^(.*[^.].*)[.]([^.]+)$");
+
+  static String localName(String remoteFileName, String checksum) {
+    requireNonNull(remoteFileName);
+    requireNonNull(checksum);
+    var matcher = fileNamesWithExtensionPattern.matcher(remoteFileName);
+    if (matcher.matches()) {
+      return String.format("%s-%s.%s", matcher.group(1), checksum, 
matcher.group(2));
+    }
+    return String.format("%s-%s", remoteFileName, checksum);
+  }
+
+  private static String tempName(String baseName) {
+    return "." + requireNonNull(baseName) + "_PID" + PID + "_" + 
UUID.randomUUID() + ".tmp";
+  }
+
+  public URL[] storeContextResources(final ContextDefinition 
contextDefinition) throws IOException {
+    requireNonNull(contextDefinition, "definition must be supplied");
+    // use a LinkedHashSet to preserve the order of the context resources
+    final Set<Path> localFiles = new LinkedHashSet<>();
+    // store it with a .json suffix, if the original file didn't have one
+    final String origSourceName = contextDefinition.getSourceFileName();
+    final String sourceNameWithSuffix =
+        origSourceName.toLowerCase().endsWith(".json") ? origSourceName : 
origSourceName + ".json";
+    final String destinationName = localName(sourceNameWithSuffix, 
contextDefinition.getChecksum());
+    try {
+      storeContextDefinition(contextDefinition, destinationName);
+      boolean successful = false;
+      while (!successful) {
+        localFiles.clear();
+        for (Resource resource : contextDefinition.getResources()) {
+          Path path = storeResource(resource);
+          if (path == null) {
+            LOG.debug("Skipped resource {} while another process or thread is 
downloading it",
+                resource.getLocation());
+            continue;
+          }
+          localFiles.add(path);
+          LOG.trace("Added resource {} to classpath", path);
+        }
+        successful = localFiles.size() == 
contextDefinition.getResources().size();
+      }
+
+    } catch (IOException | RuntimeException e) {
+      LOG.error("Error initializing context: " + destinationName, e);
+      throw e;
+    }
+    return localFiles.stream().map(p -> {
+      try {
+        return p.toUri().toURL();
+      } catch (MalformedURLException e) {
+        // this shouldn't happen since these are local file paths
+        throw new UncheckedIOException(e);
+      }
+    }).toArray(URL[]::new);
+  }
+
+  private void storeContextDefinition(final ContextDefinition 
contextDefinition,
+      final String destinationName) throws IOException {
+    Path destinationPath = contextsDir.resolve(destinationName);
+    if (Files.exists(destinationPath)) {
+      return;
+    }
+    Path tempPath = contextsDir.resolve(tempName(destinationName));
+    Files.write(tempPath, contextDefinition.toJson().getBytes(UTF_8), 
CREATE_NEW);
+    Files.move(tempPath, destinationPath, ATOMIC_MOVE);
+  }
+
+  private Path storeResource(final Resource resource) throws IOException {
+    final URL url = resource.getLocation();
+    final FileResolver source = FileResolver.resolve(url);
+    final String baseName = localName(source.getFileName(), 
resource.getChecksum());
+    final Path destinationPath = resourcesDir.resolve(baseName);
+    final Path tempPath = resourcesDir.resolve(tempName(baseName));
+    final Path inProgressPath = resourcesDir.resolve("." + baseName + 
".downloading");
+
+    if (Files.exists(destinationPath)) {
+      LOG.trace("Resource {} is already cached at {}", url, destinationPath);
+      return destinationPath;
+    }
+
+    try {
+      if (System.currentTimeMillis() - 
Files.getLastModifiedTime(inProgressPath).toMillis() < 30_000
+          || Files.deleteIfExists(inProgressPath)) {
+        return null;
+      }
+    } catch (NoSuchFileException e) {
+      // this is okay, nobody else is downloading the file, so we can try
+    }
+
+    try {
+      Files.write(inProgressPath, PID.getBytes(UTF_8), CREATE_NEW);
+    } catch (FileAlreadyExistsException e) {
+      // somebody else beat us to it, let them try to download it; we'll check 
back later
+      return null;
+    }
+
+    var task = new FutureTask<Void>(() -> downloadFile(source, tempPath, 
resource), null);
+    var t = new Thread(task);
+    t.setDaemon(true);
+    t.setName("downloading " + url + " to " + tempPath);
+
+    LOG.trace("Storing remote resource {} locally at {} via temp file {}", 
url, destinationPath,
+        tempPath);
+    t.start();
+    try {
+      while (!task.isDone()) {
+        try {
+          Files.write(inProgressPath, PID.getBytes(UTF_8), TRUNCATE_EXISTING);
+        } catch (IOException e) {
+          LOG.warn(
+              "Error writing progress file {}. Other processes may attempt 
downloading the same file.",
+              inProgressPath, e);
+        }
+        try {
+          task.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+          // timeout while waiting for task to complete; do nothing; keep 
waiting
+          LOG.trace("Still making progress downloading {}", tempPath);
+        } catch (InterruptedException e) {
+          task.cancel(true);
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(
+              "Thread was interrupted while waiting on resource to copy from " 
+ url + " to "
+                  + tempPath,
+              e);
+        } catch (ExecutionException e) {
+          throw new IllegalStateException("Error copying resource from " + url 
+ " to " + tempPath,
+              e);
+        }
+      }
+

Review Comment:
   Do you need to call `t.join()`?



##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/DeduplicationCache.java:
##########


Review Comment:
   Should the private members be marked as `final`?



##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.classloader.lcc.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.classloader.lcc.Constants;
+import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
+import org.apache.accumulo.classloader.lcc.definition.Resource;
+import org.apache.accumulo.classloader.lcc.resolvers.FileResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple storage service backed by a local file system for storing 
downloaded
+ * {@link ContextDefinition} files and the {@link Resource} objects it 
references.
+ * <p>
+ * The layout of the storage area consists of two directories:
+ * <ul>
+ * <li><b>contexts</b> stores a copy of the {@link ContextDefinition} JSON 
files for each context,
+ * and exist primarily for user convenience (they aren't used again by this 
factory)
+ * <li><b>resources</b> stores a copy of all the {@link Resource} files for 
all contexts
+ * </ul>
+ *
+ * <p>
+ * When downloading any file, the file is first downloaded to a temporary file 
with a unique name,
+ * and then atomically renamed, so it works correctly even if there are 
multiple processes or
+ * threads doing the same thing. The use of `CREATE_NEW` and the lack of 
`REPLACE_EXISTING` when
+ * creating the temporary files is to ensure we do not silently collide with 
other processes, when
+ * these temporary files should be unique.
+ *
+ * <p>
+ * When downloading resource files, an additional "in-progress" signal file 
derived from the name of
+ * the file being downloaded with the suffix ".downloading" is also used. This 
file is updated with
+ * the current process' PID every 5 seconds so long as the file is still being 
downloaded. This
+ * serves as a signal to other processes or threads that they can wait instead 
of attempting to
+ * download the same file. This avoids duplicate work. If an attempt to 
download a resource file is
+ * detected, and it has been updated within the last 30 seconds, it is skipped 
and the remaining
+ * files are downloaded before attempting it again. If this signal file hasn't 
been updated in the
+ * last 30 seconds, a download will be attempted. Failures to download are 
propagated up to the
+ * higher level. `CREATE_NEW` is used when creating this signal file, in order 
to atomically detect
+ * race collisions with other threads or processes, and to try to avoid 
duplicate work.
+ *
+ * <p>
+ * Once all files for a context are downloaded, an array of {@link URL}s will 
be returned, which
+ * point to the local files that have been downloaded for the context, in the 
same order as
+ * specified in the {@link ContextDefinition} file, to be used for 
constructing a
+ * {@link URLClassLoader}.
+ */
+public final class LocalStore {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalStore.class);
+  private static final String PID = 
Long.toString(ProcessHandle.current().pid());
+
+  private final Path contextsDir;
+  private final Path resourcesDir;
+
+  public LocalStore(final Path baseDir) throws IOException {
+    this.contextsDir = 
requireNonNull(baseDir).toAbsolutePath().resolve("contexts");
+    this.resourcesDir = baseDir.resolve("resources");
+    Files.createDirectories(contextsDir);
+    Files.createDirectories(resourcesDir);
+  }
+
+  Path contextsDir() {
+    return contextsDir;
+  }
+
+  Path resourcesDir() {
+    return resourcesDir;
+  }
+
+  // pattern to match regular files that have at least one non-dot character 
preceding a dot and a
+  // non-zero suffix; these files can be easily converted so the local store 
retains the original
+  // file name extension, while non-matching files will not attempt to retain 
the original file name
+  // extension, and will instead just append the checksum to the original file 
name
+  private static Pattern fileNamesWithExtensionPattern = 
Pattern.compile("^(.*[^.].*)[.]([^.]+)$");
+
+  static String localName(String remoteFileName, String checksum) {
+    requireNonNull(remoteFileName);
+    requireNonNull(checksum);
+    var matcher = fileNamesWithExtensionPattern.matcher(remoteFileName);
+    if (matcher.matches()) {
+      return String.format("%s-%s.%s", matcher.group(1), checksum, 
matcher.group(2));
+    }
+    return String.format("%s-%s", remoteFileName, checksum);
+  }
+
+  private static String tempName(String baseName) {
+    return "." + requireNonNull(baseName) + "_PID" + PID + "_" + 
UUID.randomUUID() + ".tmp";
+  }
+
+  public URL[] storeContextResources(final ContextDefinition 
contextDefinition) throws IOException {
+    requireNonNull(contextDefinition, "definition must be supplied");
+    // use a LinkedHashSet to preserve the order of the context resources
+    final Set<Path> localFiles = new LinkedHashSet<>();
+    // store it with a .json suffix, if the original file didn't have one
+    final String origSourceName = contextDefinition.getSourceFileName();
+    final String sourceNameWithSuffix =
+        origSourceName.toLowerCase().endsWith(".json") ? origSourceName : 
origSourceName + ".json";
+    final String destinationName = localName(sourceNameWithSuffix, 
contextDefinition.getChecksum());
+    try {
+      storeContextDefinition(contextDefinition, destinationName);
+      boolean successful = false;
+      while (!successful) {
+        localFiles.clear();
+        for (Resource resource : contextDefinition.getResources()) {
+          Path path = storeResource(resource);
+          if (path == null) {
+            LOG.debug("Skipped resource {} while another process or thread is 
downloading it",
+                resource.getLocation());
+            continue;
+          }
+          localFiles.add(path);
+          LOG.trace("Added resource {} to classpath", path);
+        }
+        successful = localFiles.size() == 
contextDefinition.getResources().size();
+      }
+
+    } catch (IOException | RuntimeException e) {
+      LOG.error("Error initializing context: " + destinationName, e);
+      throw e;
+    }
+    return localFiles.stream().map(p -> {
+      try {
+        return p.toUri().toURL();
+      } catch (MalformedURLException e) {
+        // this shouldn't happen since these are local file paths
+        throw new UncheckedIOException(e);
+      }
+    }).toArray(URL[]::new);
+  }
+
+  private void storeContextDefinition(final ContextDefinition 
contextDefinition,
+      final String destinationName) throws IOException {
+    Path destinationPath = contextsDir.resolve(destinationName);
+    if (Files.exists(destinationPath)) {
+      return;
+    }
+    Path tempPath = contextsDir.resolve(tempName(destinationName));
+    Files.write(tempPath, contextDefinition.toJson().getBytes(UTF_8), 
CREATE_NEW);
+    Files.move(tempPath, destinationPath, ATOMIC_MOVE);
+  }
+
+  private Path storeResource(final Resource resource) throws IOException {
+    final URL url = resource.getLocation();
+    final FileResolver source = FileResolver.resolve(url);
+    final String baseName = localName(source.getFileName(), 
resource.getChecksum());
+    final Path destinationPath = resourcesDir.resolve(baseName);
+    final Path tempPath = resourcesDir.resolve(tempName(baseName));
+    final Path inProgressPath = resourcesDir.resolve("." + baseName + 
".downloading");
+
+    if (Files.exists(destinationPath)) {
+      LOG.trace("Resource {} is already cached at {}", url, destinationPath);
+      return destinationPath;
+    }
+
+    try {
+      if (System.currentTimeMillis() - 
Files.getLastModifiedTime(inProgressPath).toMillis() < 30_000

Review Comment:
   We may want to make the 30s time configurable. In a thundering herd scenario 
requests to download a jar file may be in a request queue waiting to be 
serviced, not stuck waiting on I/O. Removing the in progress path to allow the 
download to be retried (by this process or another) may make the problem worse. 
Users may need a longer timeout and with this hard-coded they won't be able to 
change it.



##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.classloader.lcc.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.classloader.lcc.Constants;
+import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
+import org.apache.accumulo.classloader.lcc.definition.Resource;
+import org.apache.accumulo.classloader.lcc.resolvers.FileResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple storage service backed by a local file system for storing 
downloaded
+ * {@link ContextDefinition} files and the {@link Resource} objects it 
references.
+ * <p>
+ * The layout of the storage area consists of two directories:
+ * <ul>
+ * <li><b>contexts</b> stores a copy of the {@link ContextDefinition} JSON 
files for each context,
+ * and exist primarily for user convenience (they aren't used again by this 
factory)
+ * <li><b>resources</b> stores a copy of all the {@link Resource} files for 
all contexts
+ * </ul>
+ *
+ * <p>
+ * When downloading any file, the file is first downloaded to a temporary file 
with a unique name,
+ * and then atomically renamed, so it works correctly even if there are 
multiple processes or
+ * threads doing the same thing. The use of `CREATE_NEW` and the lack of 
`REPLACE_EXISTING` when
+ * creating the temporary files is to ensure we do not silently collide with 
other processes, when
+ * these temporary files should be unique.
+ *
+ * <p>
+ * When downloading resource files, an additional "in-progress" signal file 
derived from the name of
+ * the file being downloaded with the suffix ".downloading" is also used. This 
file is updated with
+ * the current process' PID every 5 seconds so long as the file is still being 
downloaded. This
+ * serves as a signal to other processes or threads that they can wait instead 
of attempting to
+ * download the same file. This avoids duplicate work. If an attempt to 
download a resource file is
+ * detected, and it has been updated within the last 30 seconds, it is skipped 
and the remaining
+ * files are downloaded before attempting it again. If this signal file hasn't 
been updated in the
+ * last 30 seconds, a download will be attempted. Failures to download are 
propagated up to the
+ * higher level. `CREATE_NEW` is used when creating this signal file, in order 
to atomically detect
+ * race collisions with other threads or processes, and to try to avoid 
duplicate work.
+ *
+ * <p>
+ * Once all files for a context are downloaded, an array of {@link URL}s will 
be returned, which
+ * point to the local files that have been downloaded for the context, in the 
same order as
+ * specified in the {@link ContextDefinition} file, to be used for 
constructing a
+ * {@link URLClassLoader}.
+ */
+public final class LocalStore {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalStore.class);
+  private static final String PID = 
Long.toString(ProcessHandle.current().pid());
+
+  private final Path contextsDir;
+  private final Path resourcesDir;
+
+  public LocalStore(final Path baseDir) throws IOException {
+    this.contextsDir = 
requireNonNull(baseDir).toAbsolutePath().resolve("contexts");
+    this.resourcesDir = baseDir.resolve("resources");
+    Files.createDirectories(contextsDir);
+    Files.createDirectories(resourcesDir);
+  }
+
+  Path contextsDir() {
+    return contextsDir;
+  }
+
+  Path resourcesDir() {
+    return resourcesDir;
+  }
+
+  // pattern to match regular files that have at least one non-dot character 
preceding a dot and a
+  // non-zero suffix; these files can be easily converted so the local store 
retains the original
+  // file name extension, while non-matching files will not attempt to retain 
the original file name
+  // extension, and will instead just append the checksum to the original file 
name
+  private static Pattern fileNamesWithExtensionPattern = 
Pattern.compile("^(.*[^.].*)[.]([^.]+)$");
+
+  static String localName(String remoteFileName, String checksum) {
+    requireNonNull(remoteFileName);
+    requireNonNull(checksum);
+    var matcher = fileNamesWithExtensionPattern.matcher(remoteFileName);
+    if (matcher.matches()) {
+      return String.format("%s-%s.%s", matcher.group(1), checksum, 
matcher.group(2));
+    }
+    return String.format("%s-%s", remoteFileName, checksum);
+  }
+
+  private static String tempName(String baseName) {
+    return "." + requireNonNull(baseName) + "_PID" + PID + "_" + 
UUID.randomUUID() + ".tmp";
+  }
+
+  public URL[] storeContextResources(final ContextDefinition 
contextDefinition) throws IOException {
+    requireNonNull(contextDefinition, "definition must be supplied");
+    // use a LinkedHashSet to preserve the order of the context resources
+    final Set<Path> localFiles = new LinkedHashSet<>();
+    // store it with a .json suffix, if the original file didn't have one
+    final String origSourceName = contextDefinition.getSourceFileName();
+    final String sourceNameWithSuffix =
+        origSourceName.toLowerCase().endsWith(".json") ? origSourceName : 
origSourceName + ".json";
+    final String destinationName = localName(sourceNameWithSuffix, 
contextDefinition.getChecksum());
+    try {
+      storeContextDefinition(contextDefinition, destinationName);
+      boolean successful = false;
+      while (!successful) {
+        localFiles.clear();
+        for (Resource resource : contextDefinition.getResources()) {
+          Path path = storeResource(resource);
+          if (path == null) {
+            LOG.debug("Skipped resource {} while another process or thread is 
downloading it",
+                resource.getLocation());
+            continue;
+          }
+          localFiles.add(path);
+          LOG.trace("Added resource {} to classpath", path);
+        }
+        successful = localFiles.size() == 
contextDefinition.getResources().size();
+      }
+
+    } catch (IOException | RuntimeException e) {
+      LOG.error("Error initializing context: " + destinationName, e);
+      throw e;
+    }
+    return localFiles.stream().map(p -> {
+      try {
+        return p.toUri().toURL();
+      } catch (MalformedURLException e) {
+        // this shouldn't happen since these are local file paths
+        throw new UncheckedIOException(e);
+      }
+    }).toArray(URL[]::new);
+  }
+
+  private void storeContextDefinition(final ContextDefinition 
contextDefinition,
+      final String destinationName) throws IOException {
+    Path destinationPath = contextsDir.resolve(destinationName);
+    if (Files.exists(destinationPath)) {
+      return;
+    }
+    Path tempPath = contextsDir.resolve(tempName(destinationName));
+    Files.write(tempPath, contextDefinition.toJson().getBytes(UTF_8), 
CREATE_NEW);
+    Files.move(tempPath, destinationPath, ATOMIC_MOVE);
+  }
+
+  private Path storeResource(final Resource resource) throws IOException {
+    final URL url = resource.getLocation();
+    final FileResolver source = FileResolver.resolve(url);
+    final String baseName = localName(source.getFileName(), 
resource.getChecksum());
+    final Path destinationPath = resourcesDir.resolve(baseName);
+    final Path tempPath = resourcesDir.resolve(tempName(baseName));
+    final Path inProgressPath = resourcesDir.resolve("." + baseName + 
".downloading");
+
+    if (Files.exists(destinationPath)) {
+      LOG.trace("Resource {} is already cached at {}", url, destinationPath);

Review Comment:
   There is a comment in the finally block about ignore exceptions when trying 
to delete the in progress file. I wonder if we want to call 
`Files.deleteIfExists(inProgressPath);` here before returning to try and clean 
up any existing in progress file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to