ddanielr commented on code in PR #30: URL: https://github.com/apache/accumulo-classloaders/pull/30#discussion_r2379754908
########## modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/manifest/Resource.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.classloader.lcc.manifest; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Objects; + +public class Resource { + + private final String location; + private final String checksum; + + public Resource(String location, String checksum) { + super(); Review Comment: Is this call to super needed? ```suggestion ``` ########## modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/manifest/Manifest.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.classloader.lcc.manifest; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Map; + +import org.apache.accumulo.classloader.lcc.Constants; + +public class Manifest { + private final int monitorIntervalSeconds; + private final Map<String,ContextDefinition> contexts; + private volatile transient byte[] checksum = null; + + public Manifest(int monitorIntervalSeconds, Map<String,ContextDefinition> contexts) { + super(); Review Comment: Is this call to `super` needed? ```suggestion ``` ########## modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloaders/lcc/state/ContextClassLoader.java: ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.classloaders.lcc.state; + +import java.io.File; +import java.lang.ref.WeakReference; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.classloader.lcc.Constants; +import org.apache.accumulo.classloader.lcc.cache.CacheUtils; +import org.apache.accumulo.classloader.lcc.manifest.ContextDefinition; +import org.apache.accumulo.classloader.lcc.manifest.Manifest; +import org.apache.accumulo.classloader.lcc.manifest.Resource; +import org.apache.accumulo.classloader.lcc.resolvers.FileResolver; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException; +import org.apache.accumulo.core.util.Pair; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ContextClassLoader { + + public static class ClassPathElement { + private final FileResolver remote; + private final URL localCachedCopy; + private final String localCachedCopyDigest; + + public ClassPathElement(FileResolver remote, URL localCachedCopy, + String localCachedCopyDigest) { + super(); + this.remote = remote; + this.localCachedCopy = localCachedCopy; + this.localCachedCopyDigest = localCachedCopyDigest; + } + + public FileResolver getRemote() { + return remote; + } + + public URL getLocalCachedCopy() { + return localCachedCopy; + } + + public String getLocalCachedCopyDigest() { + return localCachedCopyDigest; + } + } + + private ClassPathElement cacheResource(final Resource resource) throws Exception { + + final DigestUtils digest = new DigestUtils("MD5"); + final FileResolver source = FileResolver.resolve(resource.getURL()); + final Path cacheLocation = + contextCacheDir.resolve(source.getFileName() + "_" + resource.getChecksum()); + final File cacheFile = cacheLocation.toFile(); + if (!Files.exists(cacheLocation)) { + Files.copy(source.getInputStream(), cacheLocation); + String md5 = digest.digestAsHex(cacheFile); + if (!resource.getChecksum().equals(digest)) { + // What we just wrote does not match the Manifest. Review Comment: How should this resolve? Delete the file that was just created? ########## modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactory.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.classloader.lcc; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.classloader.lcc.cache.CacheUtils; +import org.apache.accumulo.classloader.lcc.manifest.Manifest; +import org.apache.accumulo.classloader.lcc.resolvers.FileResolver; +import org.apache.accumulo.classloaders.lcc.state.Contexts; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A ContextClassLoaderFactory implementation that does the following creates and maintains a + * ClassLoader for a named context. This factory expects the system property + * {@code Constants#MANIFEST_URL_PROPERTY} to be set to the URL of a json formatted manifest file. + * The manifest file contains an interval at which this class should monitor the manifest file for + * changes and a mapping of context names to ContextDefinitions. Each ContextDefinition contains a + * monitoring interval and a list of resources. Each resource is defined by a URL to the file and an + * expected MD5 hash value. + * + * The URLs supplied for the manifest file and for the resources can use one of the following + * protocols: file://, http://, or hdfs://. + * + * As this class processes the ContextDefinitions it fetches the contents of the resource from the + * resource URL and caches it in a directory on the local filesystem. This class uses the value of + * thesystem property {@code Constants#CACHE_DIR_PROPERTY} as the root directory and creates a + * subdirectory for each context name. Each context cache directory contains a lock file and a copy + * of each fetched resource that is named using the following format: + * fileName_md5Hash.fileNameSuffix. + * + * The lock file prevents processes from manipulating the contexts of the context cache directory + * concurrently, which enables the cache directories to be shared among multiple processes on the + * host. + * + * Note that because the cache directory is shared among multiple processes, and one process can't + * know what the other processes are doing, this class cannot clean up the shared cache directory. + * It is left to the user to remove unused context cache directories and unused old files within a + * context cache directory. + * + */ +public class LocalCachingContextClassLoaderFactory implements ContextClassLoaderFactory { + + private static final Logger LOG = + LoggerFactory.getLogger(LocalCachingContextClassLoaderFactory.class); + + private final AtomicReference<URL> manifestLocation = new AtomicReference<>(); + private final AtomicReference<Manifest> manifest = new AtomicReference<>(); + private final Contexts contexts = new Contexts(manifest); + + private Manifest parseManifest(URL url) throws ContextClassLoaderException { + LOG.trace("Retrieving manifest file from {}", url); + FileResolver resolver = FileResolver.resolve(url); + try { + try (InputStream is = resolver.getInputStream()) { + return Constants.GSON.fromJson(new InputStreamReader(is), Manifest.class); + } + } catch (IOException e) { + throw new ContextClassLoaderException("Error reading manifest file: " + resolver.getURL(), e); + } + } + + private URL getAndMonitorManifest() throws ContextClassLoaderException { + final String manifestPropValue = System.getProperty(Constants.MANIFEST_URL_PROPERTY); + if (manifestPropValue == null) { + throw new ContextClassLoaderException( + "System property " + Constants.MANIFEST_URL_PROPERTY + " not set."); + } + try { + final URL url = new URL(manifestPropValue); + final Manifest m = parseManifest(url); + manifest.compareAndSet(null, m); + contexts.update(); + Constants.EXECUTOR.scheduleWithFixedDelay(() -> { + try { + final AtomicBoolean updateRequired = new AtomicBoolean(false); + final Manifest mUpdate = parseManifest(manifestLocation.get()); + manifest.getAndAccumulate(mUpdate, (curr, update) -> { + try { + // If the Manifest file has not changed, then continue to use + // the current Manifest. If it has changed, then update the + // Contexts and use the new one. + if (Arrays.equals(curr.getChecksum(), update.getChecksum())) { + LOG.trace("Manifest file has not changed"); + return curr; + } else { + LOG.debug("Manifest file has changed, updating contexts"); + updateRequired.set(true); + return update; + } + } catch (NoSuchAlgorithmException e) { + LOG.error( + "Error computing checksum during manifest update, retaining current manifest", e); + return curr; + } + }); + if (updateRequired.get()) { + contexts.update(); + } + } catch (Exception e) { + LOG.error("Error parsing manifest at " + url); Review Comment: ```suggestion LOG.error("Error parsing manifest at {}", url); ``` ########## modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloaders/lcc/state/ContextClassLoader.java: ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.classloaders.lcc.state; + +import java.io.File; +import java.lang.ref.WeakReference; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.classloader.lcc.Constants; +import org.apache.accumulo.classloader.lcc.cache.CacheUtils; +import org.apache.accumulo.classloader.lcc.manifest.ContextDefinition; +import org.apache.accumulo.classloader.lcc.manifest.Manifest; +import org.apache.accumulo.classloader.lcc.manifest.Resource; +import org.apache.accumulo.classloader.lcc.resolvers.FileResolver; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException; +import org.apache.accumulo.core.util.Pair; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ContextClassLoader { + + public static class ClassPathElement { + private final FileResolver remote; + private final URL localCachedCopy; + private final String localCachedCopyDigest; + + public ClassPathElement(FileResolver remote, URL localCachedCopy, + String localCachedCopyDigest) { + super(); + this.remote = remote; + this.localCachedCopy = localCachedCopy; + this.localCachedCopyDigest = localCachedCopyDigest; + } + + public FileResolver getRemote() { + return remote; + } + + public URL getLocalCachedCopy() { + return localCachedCopy; + } + + public String getLocalCachedCopyDigest() { + return localCachedCopyDigest; + } + } + + private ClassPathElement cacheResource(final Resource resource) throws Exception { + + final DigestUtils digest = new DigestUtils("MD5"); + final FileResolver source = FileResolver.resolve(resource.getURL()); + final Path cacheLocation = + contextCacheDir.resolve(source.getFileName() + "_" + resource.getChecksum()); + final File cacheFile = cacheLocation.toFile(); + if (!Files.exists(cacheLocation)) { + Files.copy(source.getInputStream(), cacheLocation); + String md5 = digest.digestAsHex(cacheFile); + if (!resource.getChecksum().equals(digest)) { Review Comment: This compares a string against a DigestUtils object. Was this supposed to use the `md5` value from line 86? ########## modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloaders/lcc/state/Contexts.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.classloaders.lcc.state; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.classloader.lcc.Constants; +import org.apache.accumulo.classloader.lcc.manifest.ContextDefinition; +import org.apache.accumulo.classloader.lcc.manifest.Manifest; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Contexts { + + private static final Logger LOG = LoggerFactory.getLogger(Contexts.class); + + private final AtomicReference<Manifest> manifest; + private final Map<String,ContextClassLoader> contexts = new HashMap<>(); + private final AtomicBoolean updating = new AtomicBoolean(true); + + public Contexts(AtomicReference<Manifest> manifest) { + this.manifest = manifest; + } + + public synchronized void update() { + LOG.debug("Updating all contexts using Manifest"); + updating.set(true); + final Map<String,ContextDefinition> ctx = manifest.get().getContexts(); + final List<String> removals = new ArrayList<>(); + contexts.keySet().forEach(k -> { + if (!ctx.containsKey(k)) { + removals.add(k); + } + }); + + removals.forEach(r -> { + LOG.debug("Context {} is no longer contained in the Manifest, removing", r); + contexts.get(r).clear(); + contexts.remove(r); + }); + + final List<Future<?>> futures = new ArrayList<>(); + for (Entry<String,ContextDefinition> e : ctx.entrySet()) { + if (contexts.get(e.getKey()) == null) { + // This is a newly defined context + LOG.debug("Context {} is new in the Manifest, creating new ContextClassLoader", e.getKey()); + try { + ContextClassLoader ccl = new ContextClassLoader(manifest, e.getKey()); + contexts.put(e.getKey(), ccl); + futures.add(Constants.EXECUTOR.submit(() -> ccl.initialize())); + } catch (ContextClassLoaderException e1) { + LOG.error("Error creating new ContextClassLoader for context: " + e.getKey(), e1); Review Comment: ```suggestion LOG.error("Error creating new ContextClassLoader for context: {}", e.getKey(), e1); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
