ctubbsii commented on code in PR #35: URL: https://github.com/apache/accumulo-classloaders/pull/35#discussion_r2677363581
########## modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.classloader.lcc.util; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardOpenOption.CREATE_NEW; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; + +import org.apache.accumulo.classloader.lcc.Constants; +import org.apache.accumulo.classloader.lcc.definition.ContextDefinition; +import org.apache.accumulo.classloader.lcc.definition.Resource; +import org.apache.accumulo.classloader.lcc.resolvers.FileResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple storage service backed by a local file system for storing downloaded + * {@link ContextDefinition} files and the {@link Resource} objects it references. + * <p> + * The layout of the storage area consists of two directories: + * <ul> + * <li><b>contexts</b> stores a copy of the {@link ContextDefinition} JSON files for each context, + * and exist primarily for user convenience (they aren't used again by this factory) + * <li><b>resources</b> stores a copy of all the {@link Resource} files for all contexts + * </ul> + * + * <p> + * When downloading any file, the file is first downloaded to a temporary file with a unique name, + * and then atomically renamed, so it works correctly even if there are multiple processes or + * threads doing the same thing. The use of `CREATE_NEW` and the lack of `REPLACE_EXISTING` when + * creating the temporary files is to ensure we do not silently collide with other processes, when + * these temporary files should be unique. + * + * <p> + * When downloading resource files, an additional "in-progress" signal file derived from the name of + * the file being downloaded with the suffix ".downloading" is also used. This file is updated with + * the current process' PID every 5 seconds so long as the file is still being downloaded. This + * serves as a signal to other processes or threads that they can wait instead of attempting to + * download the same file. This avoids duplicate work. If an attempt to download a resource file is + * detected, and it has been updated within the last 30 seconds, it is skipped and the remaining + * files are downloaded before attempting it again. If this signal file hasn't been updated in the + * last 30 seconds, a download will be attempted. Failures to download are propagated up to the + * higher level. `CREATE_NEW` is used when creating this signal file, in order to atomically detect + * race collisions with other threads or processes, and to try to avoid duplicate work. + * + * <p> + * Once all files for a context are downloaded, an array of {@link URL}s will be returned, which + * point to the local files that have been downloaded for the context, in the same order as + * specified in the {@link ContextDefinition} file, to be used for constructing a + * {@link URLClassLoader}. + */ +public final class LocalStore { + private static final Logger LOG = LoggerFactory.getLogger(LocalStore.class); + private static final String PID = Long.toString(ProcessHandle.current().pid()); + + private final Path contextsDir; + private final Path resourcesDir; + + public LocalStore(final Path baseDir) throws IOException { + this.contextsDir = requireNonNull(baseDir).toAbsolutePath().resolve("contexts"); + this.resourcesDir = baseDir.resolve("resources"); + Files.createDirectories(contextsDir); + Files.createDirectories(resourcesDir); + } + + Path contextsDir() { + return contextsDir; + } + + Path resourcesDir() { + return resourcesDir; + } + + // pattern to match regular files that have at least one non-dot character preceding a dot and a + // non-zero suffix; these files can be easily converted so the local store retains the original + // file name extension, while non-matching files will not attempt to retain the original file name + // extension, and will instead just append the checksum to the original file name + private static Pattern fileNamesWithExtensionPattern = Pattern.compile("^(.*[^.].*)[.]([^.]+)$"); + + static String localName(String remoteFileName, String checksum) { + requireNonNull(remoteFileName); + requireNonNull(checksum); + var matcher = fileNamesWithExtensionPattern.matcher(remoteFileName); + if (matcher.matches()) { + return String.format("%s-%s.%s", matcher.group(1), checksum, matcher.group(2)); + } + return String.format("%s-%s", remoteFileName, checksum); + } + + private static String tempName(String baseName) { + return "." + requireNonNull(baseName) + "_PID" + PID + "_" + UUID.randomUUID() + ".tmp"; + } + + public URL[] storeContextResources(final ContextDefinition contextDefinition) throws IOException { + requireNonNull(contextDefinition, "definition must be supplied"); + // use a LinkedHashSet to preserve the order of the context resources + final Set<Path> localFiles = new LinkedHashSet<>(); + // store it with a .json suffix, if the original file didn't have one + final String origSourceName = contextDefinition.getSourceFileName(); + final String sourceNameWithSuffix = + origSourceName.toLowerCase().endsWith(".json") ? origSourceName : origSourceName + ".json"; + final String destinationName = localName(sourceNameWithSuffix, contextDefinition.getChecksum()); + try { + storeContextDefinition(contextDefinition, destinationName); + boolean successful = false; + while (!successful) { + localFiles.clear(); + for (Resource resource : contextDefinition.getResources()) { + Path path = storeResource(resource); + if (path == null) { + LOG.debug("Skipped resource {} while another process or thread is downloading it", + resource.getLocation()); + continue; + } + localFiles.add(path); + LOG.trace("Added resource {} to classpath", path); + } + successful = localFiles.size() == contextDefinition.getResources().size(); + } + + } catch (IOException | RuntimeException e) { + LOG.error("Error initializing context: " + destinationName, e); + throw e; + } + return localFiles.stream().map(p -> { + try { + return p.toUri().toURL(); + } catch (MalformedURLException e) { + // this shouldn't happen since these are local file paths + throw new UncheckedIOException(e); + } + }).toArray(URL[]::new); + } + + private void storeContextDefinition(final ContextDefinition contextDefinition, + final String destinationName) throws IOException { + Path destinationPath = contextsDir.resolve(destinationName); + if (Files.exists(destinationPath)) { + return; + } + Path tempPath = contextsDir.resolve(tempName(destinationName)); + Files.write(tempPath, contextDefinition.toJson().getBytes(UTF_8), CREATE_NEW); + Files.move(tempPath, destinationPath, ATOMIC_MOVE); + } + + private Path storeResource(final Resource resource) throws IOException { + final URL url = resource.getLocation(); + final FileResolver source = FileResolver.resolve(url); + final String baseName = localName(source.getFileName(), resource.getChecksum()); + final Path destinationPath = resourcesDir.resolve(baseName); + final Path tempPath = resourcesDir.resolve(tempName(baseName)); + final Path inProgressPath = resourcesDir.resolve("." + baseName + ".downloading"); + + if (Files.exists(destinationPath)) { + LOG.trace("Resource {} is already cached at {}", url, destinationPath); + return destinationPath; + } + + try { + if (System.currentTimeMillis() - Files.getLastModifiedTime(inProgressPath).toMillis() < 30_000 + || Files.deleteIfExists(inProgressPath)) { + return null; + } + } catch (NoSuchFileException e) { + // this is okay, nobody else is downloading the file, so we can try + } + + try { + Files.write(inProgressPath, PID.getBytes(UTF_8), CREATE_NEW); + } catch (FileAlreadyExistsException e) { + // somebody else beat us to it, let them try to download it; we'll check back later + return null; + } + + var task = new FutureTask<Void>(() -> downloadFile(source, tempPath, resource), null); + var t = new Thread(task); + t.setDaemon(true); + t.setName("downloading " + url + " to " + tempPath); + + LOG.trace("Storing remote resource {} locally at {} via temp file {}", url, destinationPath, + tempPath); + t.start(); + try { + while (!task.isDone()) { + try { + Files.write(inProgressPath, PID.getBytes(UTF_8), TRUNCATE_EXISTING); + } catch (IOException e) { + LOG.warn( + "Error writing progress file {}. Other processes may attempt downloading the same file.", + inProgressPath, e); + } + try { + task.get(5, TimeUnit.SECONDS); + } catch (TimeoutException e) { + // timeout while waiting for task to complete; do nothing; keep waiting + LOG.trace("Still making progress downloading {}", tempPath); + } catch (InterruptedException e) { + task.cancel(true); + Thread.currentThread().interrupt(); + throw new IllegalStateException( + "Thread was interrupted while waiting on resource to copy from " + url + " to " + + tempPath, + e); + } catch (ExecutionException e) { + throw new IllegalStateException("Error copying resource from " + url + " to " + tempPath, + e); + } + } + Review Comment: The only cases where we can get here where the thread is still alive is if there's an InterruptedException or a RuntimeException in the main thread and `task.cancel(true)` didn't have an effect because the task was stuck on I/O or something. I don't think this is really very likely, but if it does happen, we want to make progress on other files, and not be stuck indefinitely. The thread should finish at some point, and it's a daemon thread, so it's not going to prevent the process from exiting, so I think it's safe to just log and move on. If we start seeing this occur, we can investigate the causes, because I suspect it will indicate a different, probably unrelated, problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
