dlmarion commented on code in PR #35:
URL:
https://github.com/apache/accumulo-classloaders/pull/35#discussion_r2676988070
##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactory.java:
##########
@@ -206,70 +121,168 @@ void resetForTests() {
@Override
public void init(ContextClassLoaderEnvironment env) {
- baseCacheDir =
requireNonNull(env.getConfiguration().get(Constants.CACHE_DIR_PROPERTY),
+ String value =
requireNonNull(env.getConfiguration().get(Constants.CACHE_DIR_PROPERTY),
"Property " + Constants.CACHE_DIR_PROPERTY + " not set, cannot create
cache directory.");
String graceProp =
env.getConfiguration().get(Constants.UPDATE_FAILURE_GRACE_PERIOD_MINS);
long graceMins = graceProp == null ? 0 : Long.parseLong(graceProp);
updateFailureGracePeriodMins = Duration.ofMinutes(graceMins);
+ final Path baseCacheDir;
+ if (value.startsWith("file:")) {
+ try {
+ baseCacheDir = Path.of(new URL(value).toURI());
+ } catch (IOException | URISyntaxException e) {
+ throw new IllegalArgumentException(
+ "Malformed file: URL specified for base directory: " + value, e);
+ }
+ } else if (value.startsWith("/")) {
+ baseCacheDir = Path.of(value);
+ } else {
+ throw new IllegalArgumentException(
+ "Base directory is neither a file URL nor an absolute file path: " +
value);
+ }
try {
- CacheUtils.createBaseCacheDir(baseCacheDir);
- } catch (IOException | ContextClassLoaderException e) {
- throw new IllegalStateException("Error creating base cache directory at
" + baseCacheDir, e);
+ localStore.set(new LocalStore(baseCacheDir));
+ } catch (IOException e) {
+ throw new UncheckedIOException("Unable to create the local storage area
at " + baseCacheDir,
+ e);
}
}
- ConcurrentHashMap<String,ContextDefinition> contextDefs = new
ConcurrentHashMap<>();
-
@Override
public ClassLoader getClassLoader(final String contextLocation)
throws ContextClassLoaderException {
- Preconditions.checkState(baseCacheDir != null, "init not called before
calling getClassLoader");
- requireNonNull(contextLocation, "context name must be supplied");
- final AtomicBoolean newlyCreated = new AtomicBoolean(false);
- final AtomicReference<URLClassLoader> cl = new AtomicReference<>();
- ContextDefinition def;
+ Preconditions.checkState(localStore.get() != null,
+ "init not called before calling getClassLoader");
+ requireNonNull(contextLocation, "context location must be supplied");
+ final var classloader = new AtomicReference<URLClassLoader>();
try {
- def = contextDefs.compute(contextLocation, (k, v) -> {
- ContextDefinition def2;
- if (v == null) {
- newlyCreated.set(true);
+ // get the current definition, or create it from the location if it
doesn't exist; this has
+ // the side effect of creating and caching a URLClassLoader instance if
it doesn't exist for
+ // the computed definition
+ contextDefs.compute(contextLocation,
+ (contextLocationKey, previousDefinition) ->
computeDefinitionAndClassLoader(classloader,
+ contextLocationKey, previousDefinition));
+ } catch (RuntimeException e) {
+ throw new ContextClassLoaderException(e.getMessage(), e);
+ }
+ return classloader.get();
+ }
+
+ private ContextDefinition computeDefinitionAndClassLoader(
+ AtomicReference<URLClassLoader> resultHolder, String contextLocation,
+ ContextDefinition previousDefinition) {
+ ContextDefinition computedDefinition;
+ if (previousDefinition == null) {
+ try {
+ URL url = new URL(contextLocation);
+ computedDefinition = ContextDefinition.fromRemoteURL(url);
+ monitorContext(contextLocation,
computedDefinition.getMonitorIntervalSeconds());
Review Comment:
This will schedule a task to monitor the context definition for changes
before we know that we can download all of the resources and create a valid
ClassLoader for it.
##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactory.java:
##########
@@ -206,70 +121,168 @@ void resetForTests() {
@Override
public void init(ContextClassLoaderEnvironment env) {
- baseCacheDir =
requireNonNull(env.getConfiguration().get(Constants.CACHE_DIR_PROPERTY),
+ String value =
requireNonNull(env.getConfiguration().get(Constants.CACHE_DIR_PROPERTY),
"Property " + Constants.CACHE_DIR_PROPERTY + " not set, cannot create
cache directory.");
String graceProp =
env.getConfiguration().get(Constants.UPDATE_FAILURE_GRACE_PERIOD_MINS);
long graceMins = graceProp == null ? 0 : Long.parseLong(graceProp);
updateFailureGracePeriodMins = Duration.ofMinutes(graceMins);
+ final Path baseCacheDir;
+ if (value.startsWith("file:")) {
+ try {
+ baseCacheDir = Path.of(new URL(value).toURI());
+ } catch (IOException | URISyntaxException e) {
+ throw new IllegalArgumentException(
+ "Malformed file: URL specified for base directory: " + value, e);
+ }
+ } else if (value.startsWith("/")) {
+ baseCacheDir = Path.of(value);
+ } else {
+ throw new IllegalArgumentException(
+ "Base directory is neither a file URL nor an absolute file path: " +
value);
+ }
try {
Review Comment:
I think we should validate that `baseCacheDir` exists at this point, and
fail if not. If LocalStore is modified in the future such that the directory
creation is moved out of the constructor, then this will not fail in the same
way.
##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.classloader.lcc.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.classloader.lcc.Constants;
+import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
+import org.apache.accumulo.classloader.lcc.definition.Resource;
+import org.apache.accumulo.classloader.lcc.resolvers.FileResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple storage service backed by a local file system for storing
downloaded
+ * {@link ContextDefinition} files and the {@link Resource} objects it
references.
+ * <p>
+ * The layout of the storage area consists of two directories:
+ * <ul>
+ * <li><b>contexts</b> stores a copy of the {@link ContextDefinition} JSON
files for each context,
+ * and exist primarily for user convenience (they aren't used again by this
factory)
+ * <li><b>resources</b> stores a copy of all the {@link Resource} files for
all contexts
+ * </ul>
+ *
+ * <p>
+ * When downloading any file, the file is first downloaded to a temporary file
with a unique name,
+ * and then atomically renamed, so it works correctly even if there are
multiple processes or
+ * threads doing the same thing. The use of `CREATE_NEW` and the lack of
`REPLACE_EXISTING` when
+ * creating the temporary files is to ensure we do not silently collide with
other processes, when
+ * these temporary files should be unique.
+ *
+ * <p>
+ * When downloading resource files, an additional "in-progress" signal file
derived from the name of
+ * the file being downloaded with the suffix ".downloading" is also used. This
file is updated with
+ * the current process' PID every 5 seconds so long as the file is still being
downloaded. This
+ * serves as a signal to other processes or threads that they can wait instead
of attempting to
+ * download the same file. This avoids duplicate work. If an attempt to
download a resource file is
+ * detected, and it has been updated within the last 30 seconds, it is skipped
and the remaining
+ * files are downloaded before attempting it again. If this signal file hasn't
been updated in the
+ * last 30 seconds, a download will be attempted. Failures to download are
propagated up to the
+ * higher level. `CREATE_NEW` is used when creating this signal file, in order
to atomically detect
+ * race collisions with other threads or processes, and to try to avoid
duplicate work.
+ *
+ * <p>
+ * Once all files for a context are downloaded, an array of {@link URL}s will
be returned, which
+ * point to the local files that have been downloaded for the context, in the
same order as
+ * specified in the {@link ContextDefinition} file, to be used for
constructing a
+ * {@link URLClassLoader}.
+ */
+public final class LocalStore {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalStore.class);
+ private static final String PID =
Long.toString(ProcessHandle.current().pid());
+
+ private final Path contextsDir;
+ private final Path resourcesDir;
+
+ public LocalStore(final Path baseDir) throws IOException {
+ this.contextsDir =
requireNonNull(baseDir).toAbsolutePath().resolve("contexts");
+ this.resourcesDir = baseDir.resolve("resources");
+ Files.createDirectories(contextsDir);
+ Files.createDirectories(resourcesDir);
+ }
+
+ Path contextsDir() {
+ return contextsDir;
+ }
+
+ Path resourcesDir() {
+ return resourcesDir;
+ }
+
+ // pattern to match regular files that have at least one non-dot character
preceding a dot and a
+ // non-zero suffix; these files can be easily converted so the local store
retains the original
+ // file name extension, while non-matching files will not attempt to retain
the original file name
+ // extension, and will instead just append the checksum to the original file
name
+ private static Pattern fileNamesWithExtensionPattern =
Pattern.compile("^(.*[^.].*)[.]([^.]+)$");
+
+ static String localName(String remoteFileName, String checksum) {
+ requireNonNull(remoteFileName);
+ requireNonNull(checksum);
+ var matcher = fileNamesWithExtensionPattern.matcher(remoteFileName);
+ if (matcher.matches()) {
+ return String.format("%s-%s.%s", matcher.group(1), checksum,
matcher.group(2));
+ }
+ return String.format("%s-%s", remoteFileName, checksum);
+ }
+
+ private static String tempName(String baseName) {
+ return "." + requireNonNull(baseName) + "_PID" + PID + "_" +
UUID.randomUUID() + ".tmp";
+ }
+
+ public URL[] storeContextResources(final ContextDefinition
contextDefinition) throws IOException {
+ requireNonNull(contextDefinition, "definition must be supplied");
+ // use a LinkedHashSet to preserve the order of the context resources
+ final Set<Path> localFiles = new LinkedHashSet<>();
+ // store it with a .json suffix, if the original file didn't have one
+ final String origSourceName = contextDefinition.getSourceFileName();
+ final String sourceNameWithSuffix =
+ origSourceName.toLowerCase().endsWith(".json") ? origSourceName :
origSourceName + ".json";
+ final String destinationName = localName(sourceNameWithSuffix,
contextDefinition.getChecksum());
+ try {
+ storeContextDefinition(contextDefinition, destinationName);
+ boolean successful = false;
+ while (!successful) {
+ localFiles.clear();
+ for (Resource resource : contextDefinition.getResources()) {
+ Path path = storeResource(resource);
+ if (path == null) {
+ LOG.debug("Skipped resource {} while another process or thread is
downloading it",
+ resource.getLocation());
+ continue;
+ }
+ localFiles.add(path);
+ LOG.trace("Added resource {} to classpath", path);
+ }
+ successful = localFiles.size() ==
contextDefinition.getResources().size();
+ }
+
+ } catch (IOException | RuntimeException e) {
+ LOG.error("Error initializing context: " + destinationName, e);
Review Comment:
```suggestion
LOG.error("Error storing resources for context: {}",
contextDefinition.getSourceFileName(), e);
```
##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.classloader.lcc.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.classloader.lcc.Constants;
+import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
+import org.apache.accumulo.classloader.lcc.definition.Resource;
+import org.apache.accumulo.classloader.lcc.resolvers.FileResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple storage service backed by a local file system for storing
downloaded
+ * {@link ContextDefinition} files and the {@link Resource} objects it
references.
+ * <p>
+ * The layout of the storage area consists of two directories:
+ * <ul>
+ * <li><b>contexts</b> stores a copy of the {@link ContextDefinition} JSON
files for each context,
+ * and exist primarily for user convenience (they aren't used again by this
factory)
+ * <li><b>resources</b> stores a copy of all the {@link Resource} files for
all contexts
+ * </ul>
+ *
+ * <p>
+ * When downloading any file, the file is first downloaded to a temporary file
with a unique name,
+ * and then atomically renamed, so it works correctly even if there are
multiple processes or
+ * threads doing the same thing. The use of `CREATE_NEW` and the lack of
`REPLACE_EXISTING` when
+ * creating the temporary files is to ensure we do not silently collide with
other processes, when
+ * these temporary files should be unique.
+ *
+ * <p>
+ * When downloading resource files, an additional "in-progress" signal file
derived from the name of
+ * the file being downloaded with the suffix ".downloading" is also used. This
file is updated with
+ * the current process' PID every 5 seconds so long as the file is still being
downloaded. This
+ * serves as a signal to other processes or threads that they can wait instead
of attempting to
+ * download the same file. This avoids duplicate work. If an attempt to
download a resource file is
+ * detected, and it has been updated within the last 30 seconds, it is skipped
and the remaining
+ * files are downloaded before attempting it again. If this signal file hasn't
been updated in the
+ * last 30 seconds, a download will be attempted. Failures to download are
propagated up to the
+ * higher level. `CREATE_NEW` is used when creating this signal file, in order
to atomically detect
+ * race collisions with other threads or processes, and to try to avoid
duplicate work.
+ *
+ * <p>
+ * Once all files for a context are downloaded, an array of {@link URL}s will
be returned, which
+ * point to the local files that have been downloaded for the context, in the
same order as
+ * specified in the {@link ContextDefinition} file, to be used for
constructing a
+ * {@link URLClassLoader}.
+ */
+public final class LocalStore {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalStore.class);
+ private static final String PID =
Long.toString(ProcessHandle.current().pid());
+
+ private final Path contextsDir;
+ private final Path resourcesDir;
+
+ public LocalStore(final Path baseDir) throws IOException {
+ this.contextsDir =
requireNonNull(baseDir).toAbsolutePath().resolve("contexts");
+ this.resourcesDir = baseDir.resolve("resources");
+ Files.createDirectories(contextsDir);
+ Files.createDirectories(resourcesDir);
+ }
+
+ Path contextsDir() {
+ return contextsDir;
+ }
+
+ Path resourcesDir() {
+ return resourcesDir;
+ }
+
+ // pattern to match regular files that have at least one non-dot character
preceding a dot and a
+ // non-zero suffix; these files can be easily converted so the local store
retains the original
+ // file name extension, while non-matching files will not attempt to retain
the original file name
+ // extension, and will instead just append the checksum to the original file
name
+ private static Pattern fileNamesWithExtensionPattern =
Pattern.compile("^(.*[^.].*)[.]([^.]+)$");
+
+ static String localName(String remoteFileName, String checksum) {
+ requireNonNull(remoteFileName);
+ requireNonNull(checksum);
+ var matcher = fileNamesWithExtensionPattern.matcher(remoteFileName);
+ if (matcher.matches()) {
+ return String.format("%s-%s.%s", matcher.group(1), checksum,
matcher.group(2));
+ }
+ return String.format("%s-%s", remoteFileName, checksum);
+ }
+
+ private static String tempName(String baseName) {
+ return "." + requireNonNull(baseName) + "_PID" + PID + "_" +
UUID.randomUUID() + ".tmp";
+ }
+
+ public URL[] storeContextResources(final ContextDefinition
contextDefinition) throws IOException {
+ requireNonNull(contextDefinition, "definition must be supplied");
+ // use a LinkedHashSet to preserve the order of the context resources
+ final Set<Path> localFiles = new LinkedHashSet<>();
+ // store it with a .json suffix, if the original file didn't have one
+ final String origSourceName = contextDefinition.getSourceFileName();
+ final String sourceNameWithSuffix =
+ origSourceName.toLowerCase().endsWith(".json") ? origSourceName :
origSourceName + ".json";
+ final String destinationName = localName(sourceNameWithSuffix,
contextDefinition.getChecksum());
+ try {
+ storeContextDefinition(contextDefinition, destinationName);
+ boolean successful = false;
+ while (!successful) {
+ localFiles.clear();
+ for (Resource resource : contextDefinition.getResources()) {
+ Path path = storeResource(resource);
+ if (path == null) {
+ LOG.debug("Skipped resource {} while another process or thread is
downloading it",
+ resource.getLocation());
+ continue;
+ }
+ localFiles.add(path);
+ LOG.trace("Added resource {} to classpath", path);
+ }
+ successful = localFiles.size() ==
contextDefinition.getResources().size();
+ }
+
+ } catch (IOException | RuntimeException e) {
+ LOG.error("Error initializing context: " + destinationName, e);
+ throw e;
+ }
+ return localFiles.stream().map(p -> {
+ try {
+ return p.toUri().toURL();
+ } catch (MalformedURLException e) {
+ // this shouldn't happen since these are local file paths
+ throw new UncheckedIOException(e);
+ }
+ }).toArray(URL[]::new);
+ }
+
+ private void storeContextDefinition(final ContextDefinition
contextDefinition,
+ final String destinationName) throws IOException {
+ Path destinationPath = contextsDir.resolve(destinationName);
+ if (Files.exists(destinationPath)) {
+ return;
+ }
+ Path tempPath = contextsDir.resolve(tempName(destinationName));
+ Files.write(tempPath, contextDefinition.toJson().getBytes(UTF_8),
CREATE_NEW);
+ Files.move(tempPath, destinationPath, ATOMIC_MOVE);
+ }
+
+ private Path storeResource(final Resource resource) throws IOException {
+ final URL url = resource.getLocation();
+ final FileResolver source = FileResolver.resolve(url);
+ final String baseName = localName(source.getFileName(),
resource.getChecksum());
+ final Path destinationPath = resourcesDir.resolve(baseName);
+ final Path tempPath = resourcesDir.resolve(tempName(baseName));
+ final Path inProgressPath = resourcesDir.resolve("." + baseName +
".downloading");
+
+ if (Files.exists(destinationPath)) {
+ LOG.trace("Resource {} is already cached at {}", url, destinationPath);
+ return destinationPath;
+ }
+
+ try {
+ if (System.currentTimeMillis() -
Files.getLastModifiedTime(inProgressPath).toMillis() < 30_000
+ || Files.deleteIfExists(inProgressPath)) {
+ return null;
+ }
+ } catch (NoSuchFileException e) {
+ // this is okay, nobody else is downloading the file, so we can try
+ }
+
+ try {
+ Files.write(inProgressPath, PID.getBytes(UTF_8), CREATE_NEW);
+ } catch (FileAlreadyExistsException e) {
+ // somebody else beat us to it, let them try to download it; we'll check
back later
+ return null;
+ }
+
+ var task = new FutureTask<Void>(() -> downloadFile(source, tempPath,
resource), null);
+ var t = new Thread(task);
+ t.setDaemon(true);
+ t.setName("downloading " + url + " to " + tempPath);
+
+ LOG.trace("Storing remote resource {} locally at {} via temp file {}",
url, destinationPath,
+ tempPath);
+ t.start();
+ try {
+ while (!task.isDone()) {
+ try {
+ Files.write(inProgressPath, PID.getBytes(UTF_8), TRUNCATE_EXISTING);
+ } catch (IOException e) {
+ LOG.warn(
+ "Error writing progress file {}. Other processes may attempt
downloading the same file.",
+ inProgressPath, e);
+ }
+ try {
+ task.get(5, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ // timeout while waiting for task to complete; do nothing; keep
waiting
+ LOG.trace("Still making progress downloading {}", tempPath);
+ } catch (InterruptedException e) {
+ task.cancel(true);
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(
+ "Thread was interrupted while waiting on resource to copy from "
+ url + " to "
+ + tempPath,
+ e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("Error copying resource from " + url
+ " to " + tempPath,
+ e);
+ }
+ }
+
Review Comment:
Do you need to call `t.join()`?
##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/DeduplicationCache.java:
##########
Review Comment:
Should the private members be marked as `final`?
##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.classloader.lcc.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.classloader.lcc.Constants;
+import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
+import org.apache.accumulo.classloader.lcc.definition.Resource;
+import org.apache.accumulo.classloader.lcc.resolvers.FileResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple storage service backed by a local file system for storing
downloaded
+ * {@link ContextDefinition} files and the {@link Resource} objects it
references.
+ * <p>
+ * The layout of the storage area consists of two directories:
+ * <ul>
+ * <li><b>contexts</b> stores a copy of the {@link ContextDefinition} JSON
files for each context,
+ * and exist primarily for user convenience (they aren't used again by this
factory)
+ * <li><b>resources</b> stores a copy of all the {@link Resource} files for
all contexts
+ * </ul>
+ *
+ * <p>
+ * When downloading any file, the file is first downloaded to a temporary file
with a unique name,
+ * and then atomically renamed, so it works correctly even if there are
multiple processes or
+ * threads doing the same thing. The use of `CREATE_NEW` and the lack of
`REPLACE_EXISTING` when
+ * creating the temporary files is to ensure we do not silently collide with
other processes, when
+ * these temporary files should be unique.
+ *
+ * <p>
+ * When downloading resource files, an additional "in-progress" signal file
derived from the name of
+ * the file being downloaded with the suffix ".downloading" is also used. This
file is updated with
+ * the current process' PID every 5 seconds so long as the file is still being
downloaded. This
+ * serves as a signal to other processes or threads that they can wait instead
of attempting to
+ * download the same file. This avoids duplicate work. If an attempt to
download a resource file is
+ * detected, and it has been updated within the last 30 seconds, it is skipped
and the remaining
+ * files are downloaded before attempting it again. If this signal file hasn't
been updated in the
+ * last 30 seconds, a download will be attempted. Failures to download are
propagated up to the
+ * higher level. `CREATE_NEW` is used when creating this signal file, in order
to atomically detect
+ * race collisions with other threads or processes, and to try to avoid
duplicate work.
+ *
+ * <p>
+ * Once all files for a context are downloaded, an array of {@link URL}s will
be returned, which
+ * point to the local files that have been downloaded for the context, in the
same order as
+ * specified in the {@link ContextDefinition} file, to be used for
constructing a
+ * {@link URLClassLoader}.
+ */
+public final class LocalStore {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalStore.class);
+ private static final String PID =
Long.toString(ProcessHandle.current().pid());
+
+ private final Path contextsDir;
+ private final Path resourcesDir;
+
+ public LocalStore(final Path baseDir) throws IOException {
+ this.contextsDir =
requireNonNull(baseDir).toAbsolutePath().resolve("contexts");
+ this.resourcesDir = baseDir.resolve("resources");
+ Files.createDirectories(contextsDir);
+ Files.createDirectories(resourcesDir);
+ }
+
+ Path contextsDir() {
+ return contextsDir;
+ }
+
+ Path resourcesDir() {
+ return resourcesDir;
+ }
+
+ // pattern to match regular files that have at least one non-dot character
preceding a dot and a
+ // non-zero suffix; these files can be easily converted so the local store
retains the original
+ // file name extension, while non-matching files will not attempt to retain
the original file name
+ // extension, and will instead just append the checksum to the original file
name
+ private static Pattern fileNamesWithExtensionPattern =
Pattern.compile("^(.*[^.].*)[.]([^.]+)$");
+
+ static String localName(String remoteFileName, String checksum) {
+ requireNonNull(remoteFileName);
+ requireNonNull(checksum);
+ var matcher = fileNamesWithExtensionPattern.matcher(remoteFileName);
+ if (matcher.matches()) {
+ return String.format("%s-%s.%s", matcher.group(1), checksum,
matcher.group(2));
+ }
+ return String.format("%s-%s", remoteFileName, checksum);
+ }
+
+ private static String tempName(String baseName) {
+ return "." + requireNonNull(baseName) + "_PID" + PID + "_" +
UUID.randomUUID() + ".tmp";
+ }
+
+ public URL[] storeContextResources(final ContextDefinition
contextDefinition) throws IOException {
+ requireNonNull(contextDefinition, "definition must be supplied");
+ // use a LinkedHashSet to preserve the order of the context resources
+ final Set<Path> localFiles = new LinkedHashSet<>();
+ // store it with a .json suffix, if the original file didn't have one
+ final String origSourceName = contextDefinition.getSourceFileName();
+ final String sourceNameWithSuffix =
+ origSourceName.toLowerCase().endsWith(".json") ? origSourceName :
origSourceName + ".json";
+ final String destinationName = localName(sourceNameWithSuffix,
contextDefinition.getChecksum());
+ try {
+ storeContextDefinition(contextDefinition, destinationName);
+ boolean successful = false;
+ while (!successful) {
+ localFiles.clear();
+ for (Resource resource : contextDefinition.getResources()) {
+ Path path = storeResource(resource);
+ if (path == null) {
+ LOG.debug("Skipped resource {} while another process or thread is
downloading it",
+ resource.getLocation());
+ continue;
+ }
+ localFiles.add(path);
+ LOG.trace("Added resource {} to classpath", path);
+ }
+ successful = localFiles.size() ==
contextDefinition.getResources().size();
+ }
+
+ } catch (IOException | RuntimeException e) {
+ LOG.error("Error initializing context: " + destinationName, e);
+ throw e;
+ }
+ return localFiles.stream().map(p -> {
+ try {
+ return p.toUri().toURL();
+ } catch (MalformedURLException e) {
+ // this shouldn't happen since these are local file paths
+ throw new UncheckedIOException(e);
+ }
+ }).toArray(URL[]::new);
+ }
+
+ private void storeContextDefinition(final ContextDefinition
contextDefinition,
+ final String destinationName) throws IOException {
+ Path destinationPath = contextsDir.resolve(destinationName);
+ if (Files.exists(destinationPath)) {
+ return;
+ }
+ Path tempPath = contextsDir.resolve(tempName(destinationName));
+ Files.write(tempPath, contextDefinition.toJson().getBytes(UTF_8),
CREATE_NEW);
+ Files.move(tempPath, destinationPath, ATOMIC_MOVE);
+ }
+
+ private Path storeResource(final Resource resource) throws IOException {
+ final URL url = resource.getLocation();
+ final FileResolver source = FileResolver.resolve(url);
+ final String baseName = localName(source.getFileName(),
resource.getChecksum());
+ final Path destinationPath = resourcesDir.resolve(baseName);
+ final Path tempPath = resourcesDir.resolve(tempName(baseName));
+ final Path inProgressPath = resourcesDir.resolve("." + baseName +
".downloading");
+
+ if (Files.exists(destinationPath)) {
+ LOG.trace("Resource {} is already cached at {}", url, destinationPath);
+ return destinationPath;
+ }
+
+ try {
+ if (System.currentTimeMillis() -
Files.getLastModifiedTime(inProgressPath).toMillis() < 30_000
Review Comment:
We may want to make the 30s time configurable. In a thundering herd scenario
requests to download a jar file may be in a request queue waiting to be
serviced, not stuck waiting on I/O. Removing the in progress path to allow the
download to be retried (by this process or another) may make the problem worse.
Users may need a longer timeout and with this hard-coded they won't be able to
change it.
##########
modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.classloader.lcc.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.classloader.lcc.Constants;
+import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
+import org.apache.accumulo.classloader.lcc.definition.Resource;
+import org.apache.accumulo.classloader.lcc.resolvers.FileResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple storage service backed by a local file system for storing
downloaded
+ * {@link ContextDefinition} files and the {@link Resource} objects it
references.
+ * <p>
+ * The layout of the storage area consists of two directories:
+ * <ul>
+ * <li><b>contexts</b> stores a copy of the {@link ContextDefinition} JSON
files for each context,
+ * and exist primarily for user convenience (they aren't used again by this
factory)
+ * <li><b>resources</b> stores a copy of all the {@link Resource} files for
all contexts
+ * </ul>
+ *
+ * <p>
+ * When downloading any file, the file is first downloaded to a temporary file
with a unique name,
+ * and then atomically renamed, so it works correctly even if there are
multiple processes or
+ * threads doing the same thing. The use of `CREATE_NEW` and the lack of
`REPLACE_EXISTING` when
+ * creating the temporary files is to ensure we do not silently collide with
other processes, when
+ * these temporary files should be unique.
+ *
+ * <p>
+ * When downloading resource files, an additional "in-progress" signal file
derived from the name of
+ * the file being downloaded with the suffix ".downloading" is also used. This
file is updated with
+ * the current process' PID every 5 seconds so long as the file is still being
downloaded. This
+ * serves as a signal to other processes or threads that they can wait instead
of attempting to
+ * download the same file. This avoids duplicate work. If an attempt to
download a resource file is
+ * detected, and it has been updated within the last 30 seconds, it is skipped
and the remaining
+ * files are downloaded before attempting it again. If this signal file hasn't
been updated in the
+ * last 30 seconds, a download will be attempted. Failures to download are
propagated up to the
+ * higher level. `CREATE_NEW` is used when creating this signal file, in order
to atomically detect
+ * race collisions with other threads or processes, and to try to avoid
duplicate work.
+ *
+ * <p>
+ * Once all files for a context are downloaded, an array of {@link URL}s will
be returned, which
+ * point to the local files that have been downloaded for the context, in the
same order as
+ * specified in the {@link ContextDefinition} file, to be used for
constructing a
+ * {@link URLClassLoader}.
+ */
+public final class LocalStore {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalStore.class);
+ private static final String PID =
Long.toString(ProcessHandle.current().pid());
+
+ private final Path contextsDir;
+ private final Path resourcesDir;
+
+ public LocalStore(final Path baseDir) throws IOException {
+ this.contextsDir =
requireNonNull(baseDir).toAbsolutePath().resolve("contexts");
+ this.resourcesDir = baseDir.resolve("resources");
+ Files.createDirectories(contextsDir);
+ Files.createDirectories(resourcesDir);
+ }
+
+ Path contextsDir() {
+ return contextsDir;
+ }
+
+ Path resourcesDir() {
+ return resourcesDir;
+ }
+
+ // pattern to match regular files that have at least one non-dot character
preceding a dot and a
+ // non-zero suffix; these files can be easily converted so the local store
retains the original
+ // file name extension, while non-matching files will not attempt to retain
the original file name
+ // extension, and will instead just append the checksum to the original file
name
+ private static Pattern fileNamesWithExtensionPattern =
Pattern.compile("^(.*[^.].*)[.]([^.]+)$");
+
+ static String localName(String remoteFileName, String checksum) {
+ requireNonNull(remoteFileName);
+ requireNonNull(checksum);
+ var matcher = fileNamesWithExtensionPattern.matcher(remoteFileName);
+ if (matcher.matches()) {
+ return String.format("%s-%s.%s", matcher.group(1), checksum,
matcher.group(2));
+ }
+ return String.format("%s-%s", remoteFileName, checksum);
+ }
+
+ private static String tempName(String baseName) {
+ return "." + requireNonNull(baseName) + "_PID" + PID + "_" +
UUID.randomUUID() + ".tmp";
+ }
+
+ public URL[] storeContextResources(final ContextDefinition
contextDefinition) throws IOException {
+ requireNonNull(contextDefinition, "definition must be supplied");
+ // use a LinkedHashSet to preserve the order of the context resources
+ final Set<Path> localFiles = new LinkedHashSet<>();
+ // store it with a .json suffix, if the original file didn't have one
+ final String origSourceName = contextDefinition.getSourceFileName();
+ final String sourceNameWithSuffix =
+ origSourceName.toLowerCase().endsWith(".json") ? origSourceName :
origSourceName + ".json";
+ final String destinationName = localName(sourceNameWithSuffix,
contextDefinition.getChecksum());
+ try {
+ storeContextDefinition(contextDefinition, destinationName);
+ boolean successful = false;
+ while (!successful) {
+ localFiles.clear();
+ for (Resource resource : contextDefinition.getResources()) {
+ Path path = storeResource(resource);
+ if (path == null) {
+ LOG.debug("Skipped resource {} while another process or thread is
downloading it",
+ resource.getLocation());
+ continue;
+ }
+ localFiles.add(path);
+ LOG.trace("Added resource {} to classpath", path);
+ }
+ successful = localFiles.size() ==
contextDefinition.getResources().size();
+ }
+
+ } catch (IOException | RuntimeException e) {
+ LOG.error("Error initializing context: " + destinationName, e);
+ throw e;
+ }
+ return localFiles.stream().map(p -> {
+ try {
+ return p.toUri().toURL();
+ } catch (MalformedURLException e) {
+ // this shouldn't happen since these are local file paths
+ throw new UncheckedIOException(e);
+ }
+ }).toArray(URL[]::new);
+ }
+
+ private void storeContextDefinition(final ContextDefinition
contextDefinition,
+ final String destinationName) throws IOException {
+ Path destinationPath = contextsDir.resolve(destinationName);
+ if (Files.exists(destinationPath)) {
+ return;
+ }
+ Path tempPath = contextsDir.resolve(tempName(destinationName));
+ Files.write(tempPath, contextDefinition.toJson().getBytes(UTF_8),
CREATE_NEW);
+ Files.move(tempPath, destinationPath, ATOMIC_MOVE);
+ }
+
+ private Path storeResource(final Resource resource) throws IOException {
+ final URL url = resource.getLocation();
+ final FileResolver source = FileResolver.resolve(url);
+ final String baseName = localName(source.getFileName(),
resource.getChecksum());
+ final Path destinationPath = resourcesDir.resolve(baseName);
+ final Path tempPath = resourcesDir.resolve(tempName(baseName));
+ final Path inProgressPath = resourcesDir.resolve("." + baseName +
".downloading");
+
+ if (Files.exists(destinationPath)) {
+ LOG.trace("Resource {} is already cached at {}", url, destinationPath);
Review Comment:
There is a comment in the finally block about ignore exceptions when trying
to delete the in progress file. I wonder if we want to call
`Files.deleteIfExists(inProgressPath);` here before returning to try and clean
up any existing in progress file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]