Repository: nifi-minifi Updated Branches: refs/heads/master bcf6c6cb2 -> 967d40243
MINIFI-142 Upgrading NiFi dependencies to 1.1.0 and adjusting NAR and ClassLoader code to reflect changes introduced in NiFi. Signed-off-by: Joseph Percivall <jperciv...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/85d69ead Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/85d69ead Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/85d69ead Branch: refs/heads/master Commit: 85d69eadae5bffb570a28fcb4a05649d58605ce6 Parents: bcf6c6c Author: Aldrin Piri <ald...@apache.org> Authored: Tue Nov 29 16:36:46 2016 -0500 Committer: Joseph Percivall <jperciv...@apache.org> Committed: Wed Nov 30 17:13:13 2016 -0500 ---------------------------------------------------------------------- .../org/apache/nifi/nar/ExtensionManager.java | 103 ++++++++++-- .../apache/nifi/nar/InstanceClassLoader.java | 160 +++++++++++++++++++ .../java/org/apache/nifi/nar/NarCloseable.java | 27 ++-- .../nifi/nar/NarThreadContextClassLoader.java | 22 ++- .../java/org/apache/nifi/nar/NarUnpacker.java | 28 ++-- .../java/org/apache/nifi/util/FileUtils.java | 105 ++++-------- .../pom.xml | 5 + .../MiNiFiPersistentProvenanceRepository.java | 52 +++--- ...estMiNiFiPersistentProvenanceRepository.java | 156 +++--------------- pom.xml | 2 +- 10 files changed, 385 insertions(+), 275 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java index 787fb3c..9fd9e66 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java @@ -16,14 +16,8 @@ */ package org.apache.nifi.nar; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.ServiceLoader; -import java.util.Set; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.authentication.LoginIdentityProvider; - import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.repository.ContentRepository; @@ -34,10 +28,21 @@ import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.processor.Processor; import org.apache.nifi.provenance.ProvenanceRepository; import org.apache.nifi.reporting.ReportingTask; - +import org.apache.nifi.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + /** * Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs). * @@ -53,6 +58,9 @@ public class ExtensionManager { private static final Map<String, ClassLoader> extensionClassloaderLookup = new HashMap<>(); + private static final Set<String> requiresInstanceClassLoading = new HashSet<>(); + private static final Map<String, ClassLoader> instanceClassloaderLookup = new ConcurrentHashMap<>(); + static { definitionMap.put(Processor.class, new HashSet<>()); definitionMap.put(FlowFilePrioritizer.class, new HashSet<>()); @@ -127,6 +135,12 @@ public class ExtensionManager { if (registeredClassLoader == null) { classloaderMap.put(className, classLoader); classes.add(type); + + // keep track of which classes require a class loader per component instance + if (type.isAnnotationPresent(RequiresInstanceClassLoading.class)) { + requiresInstanceClassLoading.add(className); + } + } else { boolean loadedFromAncestor = false; @@ -159,6 +173,77 @@ public class ExtensionManager { return extensionClassloaderLookup.get(classType); } + /** + * Determines the effective ClassLoader for the instance of the given type. + * + * @param classType the type of class to lookup the ClassLoader for + * @param instanceIdentifier the identifier of the specific instance of the classType to look up the ClassLoader for + * @return the ClassLoader for the given instance of the given type, or null if the type is not a detected extension type + */ + public static ClassLoader getClassLoader(final String classType, final String instanceIdentifier) { + if (StringUtils.isEmpty(classType) || StringUtils.isEmpty(instanceIdentifier)) { + throw new IllegalArgumentException("Class Type and Instance Identifier must be provided"); + } + + // Check if we already have a ClassLoader for this instance + ClassLoader instanceClassLoader = instanceClassloaderLookup.get(instanceIdentifier); + + // If we don't then we'll create a new ClassLoader for this instance and add it to the map for future lookups + if (instanceClassLoader == null) { + final ClassLoader registeredClassLoader = getClassLoader(classType); + if (registeredClassLoader == null) { + return null; + } + + // If the class is annotated with @RequiresInstanceClassLoading and the registered ClassLoader is a URLClassLoader + // then make a new InstanceClassLoader that is a full copy of the NAR Class Loader, otherwise create an empty + // InstanceClassLoader that has the NAR ClassLoader as a parent + if (requiresInstanceClassLoading.contains(classType) && (registeredClassLoader instanceof URLClassLoader)) { + final URLClassLoader registeredUrlClassLoader = (URLClassLoader) registeredClassLoader; + instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, registeredUrlClassLoader.getURLs(), registeredUrlClassLoader.getParent()); + } else { + instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, new URL[0], registeredClassLoader); + } + + instanceClassloaderLookup.put(instanceIdentifier, instanceClassLoader); + } + + return instanceClassLoader; + } + + /** + * Removes the ClassLoader for the given instance and closes it if necessary. + * + * @param instanceIdentifier the identifier of a component to remove the ClassLoader for + * @return the removed ClassLoader for the given instance, or null if not found + */ + public static ClassLoader removeInstanceClassLoaderIfExists(final String instanceIdentifier) { + if (instanceIdentifier == null) { + return null; + } + + final ClassLoader classLoader = instanceClassloaderLookup.remove(instanceIdentifier); + if (classLoader != null && (classLoader instanceof URLClassLoader)) { + final URLClassLoader urlClassLoader = (URLClassLoader) classLoader; + try { + urlClassLoader.close(); + } catch (IOException e) { + logger.warn("Unable to class URLClassLoader for " + instanceIdentifier); + } + } + return classLoader; + } + + /** + * Checks if the given class type requires per-instance class loading (i.e. contains the @RequiresInstanceClassLoading annotation) + * + * @param classType the class to check + * @return true if the class is found in the set of classes requiring instance level class loading, false otherwise + */ + public static boolean requiresInstanceClassLoading(final String classType) { + return requiresInstanceClassLoading.contains(classType); + } + public static Set<Class> getExtensions(final Class<?> definition) { final Set<Class> extensions = definitionMap.get(definition); return (extensions == null) ? Collections.<Class>emptySet() : extensions; @@ -180,4 +265,4 @@ public class ExtensionManager { logger.info(builder.toString()); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java new file mode 100644 index 0000000..8aff08f --- /dev/null +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; + +/** + * A ClassLoader created for an instance of a component which lets a client add resources to an intermediary ClassLoader + * that will be checked first when loading/finding classes. + * + * Typically an instance of this ClassLoader will be created by passing in the URLs and parent from a NARClassLoader in + * order to create a copy of the NARClassLoader without modifying it. + */ +public class InstanceClassLoader extends URLClassLoader { + + private static final Logger logger = LoggerFactory.getLogger(InstanceClassLoader.class); + + private final String identifier; + private final String instanceType; + private ShimClassLoader shimClassLoader; + + /** + * @param identifier the id of the component this ClassLoader was created for + * @param urls the URLs for the ClassLoader + * @param parent the parent ClassLoader + */ + public InstanceClassLoader(final String identifier, final String type, final URL[] urls, final ClassLoader parent) { + super(urls, parent); + this.identifier = identifier; + this.instanceType = type; + } + + /** + * Initializes a new ShimClassLoader for the provided resources, closing the previous ShimClassLoader if one existed. + * + * @param urls the URLs for the ShimClassLoader + * @throws IOException if the previous ShimClassLoader existed and couldn't be closed + */ + public synchronized void setInstanceResources(final URL[] urls) { + if (shimClassLoader != null) { + try { + shimClassLoader.close(); + } catch (IOException e) { + logger.warn("Unable to close inner URLClassLoader for " + identifier); + } + } + + shimClassLoader = new ShimClassLoader(urls, getParent()); + } + + /** + * @return the URLs for the instance resources that have been set + */ + public synchronized URL[] getInstanceResources() { + if (shimClassLoader != null) { + return shimClassLoader.getURLs(); + } + return new URL[0]; + } + + @Override + public Class<?> loadClass(String name) throws ClassNotFoundException { + return this.loadClass(name, false); + } + + @Override + protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + Class<?> c = null; + // first try the shim + if (shimClassLoader != null) { + try { + c = shimClassLoader.loadClass(name, resolve); + } catch (ClassNotFoundException e) { + c = null; + } + } + // if it wasn't in the shim try our self + if (c == null) { + return super.loadClass(name, resolve); + } else { + return c; + } + } + + @Override + protected Class<?> findClass(String name) throws ClassNotFoundException { + Class<?> c = null; + // first try the shim + if (shimClassLoader != null) { + try { + c = shimClassLoader.findClass(name); + } catch (ClassNotFoundException cnf) { + c = null; + } + } + // if it wasn't in the shim try our self + if (c == null) { + return super.findClass(name); + } else { + return c; + } + } + + @Override + public void close() throws IOException { + if (shimClassLoader != null) { + try { + shimClassLoader.close(); + } catch (IOException e) { + logger.warn("Unable to close inner URLClassLoader for " + identifier); + } + } + super.close(); + } + + /** + * Extend URLClassLoader to increase visibility of protected methods so that InstanceClassLoader can delegate. + */ + private static class ShimClassLoader extends URLClassLoader { + + public ShimClassLoader(URL[] urls, ClassLoader parent) { + super(urls, parent); + } + + public ShimClassLoader(URL[] urls) { + super(urls); + } + + @Override + public Class<?> findClass(String name) throws ClassNotFoundException { + return super.findClass(name); + } + + @Override + public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + return super.loadClass(name, resolve); + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java index 639d032..56aff9e 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java @@ -35,18 +35,25 @@ public class NarCloseable implements Closeable { } /** - * Sets the current thread context class loader to the specific appropriate - * Nar class loader for the given configurable component. Restores to the - * previous classloader once complete. If the given class is not assignable - * from ConfigurableComponent then the NarThreadContextClassLoader is used. + * Sets the current thread context class loader to the specific appropriate class loader for the given + * component. If the component requires per-instance class loading then the class loader will be the + * specific class loader for instance with the given identifier, otherwise the class loader will be + * the NARClassLoader. * - * @param componentClass componentClass - * @return NarCloseable with current thread context classloader jailed to - * the nar of the component + * @param componentClass the component class + * @param componentIdentifier the identifier of the component + * @return NarCloseable with the current thread context classloader jailed to the Nar + * or instance class loader of the component */ - public static NarCloseable withComponentNarLoader(final Class componentClass) { + public static NarCloseable withComponentNarLoader(final Class componentClass, final String componentIdentifier) { final ClassLoader current = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(componentClass.getClassLoader()); + + ClassLoader componentClassLoader = ExtensionManager.getClassLoader(componentClass.getName(), componentIdentifier); + if (componentClassLoader == null) { + componentClassLoader = componentClass.getClassLoader(); + } + + Thread.currentThread().setContextClassLoader(componentClassLoader); return new NarCloseable(current); } @@ -88,4 +95,4 @@ public class NarCloseable implements Closeable { Thread.currentThread().setContextClassLoader(toSet); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java index 5c70927..6a66ba2 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java @@ -203,14 +203,28 @@ public class NarThreadContextClassLoader extends URLClassLoader { return typeDefinition.cast(desiredClass.newInstance()); } Constructor<?> constructor = null; + try { constructor = desiredClass.getConstructor(NiFiProperties.class); - return typeDefinition.cast(constructor.newInstance(nifiProperties)); - } catch (final NoSuchMethodException | InvocationTargetException ex) { - return typeDefinition.cast(desiredClass.newInstance()); + } catch (NoSuchMethodException nsme) { + try { + constructor = desiredClass.getConstructor(); + } catch (NoSuchMethodException nsme2) { + throw new IllegalStateException("Failed to find constructor which takes NiFiProperties as argument as well as the default constructor on " + + desiredClass.getName(), nsme2); + } + } + try { + if (constructor.getParameterTypes().length == 0) { + return typeDefinition.cast(constructor.newInstance()); + } else { + return typeDefinition.cast(constructor.newInstance(nifiProperties)); + } + } catch (InvocationTargetException ite) { + throw new IllegalStateException("Failed to instantiate a component due to (see target exception)", ite); } } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java index 2af1090..13108c1 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java @@ -50,7 +50,7 @@ import java.util.jar.Manifest; */ public final class NarUnpacker { - private static final Logger logger = LoggerFactory.getLogger(org.apache.nifi.nar.NarUnpacker.class); + private static final Logger logger = LoggerFactory.getLogger(NarUnpacker.class); private static String HASH_FILENAME = "nar-md5sum"; private static final FileFilter NAR_FILTER = new FileFilter() { @Override @@ -72,14 +72,16 @@ public final class NarUnpacker { final List<File> narFiles = new ArrayList<>(); // make sure the nar directories are there and accessible - FileUtils.ensureDirectoryExistAndCanAccess(frameworkWorkingDir); - FileUtils.ensureDirectoryExistAndCanAccess(extensionsWorkingDir); - FileUtils.ensureDirectoryExistAndCanAccess(docsWorkingDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(frameworkWorkingDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(extensionsWorkingDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(docsWorkingDir); for (Path narLibraryDir : narLibraryDirs) { File narDir = narLibraryDir.toFile(); - FileUtils.ensureDirectoryExistAndCanAccess(narDir); + + // Test if the source NARs can be read + FileUtils.ensureDirectoryExistAndCanRead(narDir); File[] dirFiles = narDir.listFiles(NAR_FILTER); if (dirFiles != null) { @@ -170,7 +172,7 @@ public final class NarUnpacker { } private static void mapExtensions(final File workingDirectory, final File docsDirectory, - final ExtensionMapping mapping) throws IOException { + final ExtensionMapping mapping) throws IOException { final File[] directoryContents = workingDirectory.listFiles(); if (directoryContents != null) { for (final File file : directoryContents) { @@ -256,7 +258,7 @@ public final class NarUnpacker { } private static void unpackDocumentation(final File jar, final File docsDirectory, - final ExtensionMapping extensionMapping) throws IOException { + final ExtensionMapping extensionMapping) throws IOException { // determine the components that may have documentation determineDocumentedNiFiComponents(jar, extensionMapping); @@ -298,7 +300,7 @@ public final class NarUnpacker { } private static void determineDocumentedNiFiComponents(final File jar, - final ExtensionMapping extensionMapping) throws IOException { + final ExtensionMapping extensionMapping) throws IOException { try (final JarFile jarFile = new JarFile(jar)) { final JarEntry processorEntry = jarFile .getJarEntry("META-INF/services/org.apache.nifi.processor.Processor"); @@ -317,7 +319,7 @@ public final class NarUnpacker { } private static List<String> determineDocumentedNiFiComponents(final JarFile jarFile, - final JarEntry jarEntry) throws IOException { + final JarEntry jarEntry) throws IOException { final List<String> componentNames = new ArrayList<>(); if (jarEntry == null) { @@ -325,8 +327,8 @@ public final class NarUnpacker { } try (final InputStream entryInputStream = jarFile.getInputStream(jarEntry); - final BufferedReader reader = new BufferedReader(new InputStreamReader( - entryInputStream))) { + final BufferedReader reader = new BufferedReader(new InputStreamReader( + entryInputStream))) { String line; while ((line = reader.readLine()) != null) { final String trimmedLine = line.trim(); @@ -355,7 +357,7 @@ public final class NarUnpacker { */ private static void makeFile(final InputStream inputStream, final File file) throws IOException { try (final InputStream in = inputStream; - final FileOutputStream fos = new FileOutputStream(file)) { + final FileOutputStream fos = new FileOutputStream(file)) { byte[] bytes = new byte[65536]; int numRead; while ((numRead = in.read(bytes)) != -1) { @@ -393,4 +395,4 @@ public final class NarUnpacker { private NarUnpacker() { } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java index 5462f23..5e8a3c3 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java @@ -33,7 +33,13 @@ public class FileUtils { public static final long MILLIS_BETWEEN_ATTEMPTS = 50L; + /* Superseded by renamed class bellow */ + @Deprecated public static void ensureDirectoryExistAndCanAccess(final File dir) throws IOException { + ensureDirectoryExistAndCanReadAndWrite(dir); + } + + public static void ensureDirectoryExistAndCanReadAndWrite(final File dir) throws IOException { if (dir.exists() && !dir.isDirectory()) { throw new IOException(dir.getAbsolutePath() + " is not a directory"); } else if (!dir.exists()) { @@ -47,6 +53,20 @@ public class FileUtils { } } + public static void ensureDirectoryExistAndCanRead(final File dir) throws IOException { + if (dir.exists() && !dir.isDirectory()) { + throw new IOException(dir.getAbsolutePath() + " is not a directory"); + } else if (!dir.exists()) { + final boolean made = dir.mkdirs(); + if (!made) { + throw new IOException(dir.getAbsolutePath() + " could not be created"); + } + } + if (!dir.canRead()) { + throw new IOException(dir.getAbsolutePath() + " directory does not have read privilege"); + } + } + /** * Deletes the given file. If the given file exists but could not be deleted * this will be printed as a warning to the given logger @@ -103,79 +123,8 @@ public class FileUtils { * @param directory to delete contents of * @param filter if null then no filter is used * @param logger to notify - * @deprecated As of release 0.6.0, replaced by - * {@link #deleteFilesInDirectory(File, FilenameFilter, Logger)} - */ - @Deprecated - public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger) { - FileUtils.deleteFilesInDir(directory, filter, logger, false); - } - - /** - * Deletes all files (not directories) in the given directory (recursive) - * that match the given filename filter. If any file cannot be deleted then - * this is printed at warn to the given logger. - * - * @param directory to delete contents of - * @param filter if null then no filter is used - * @param logger to notify - * @param recurse true if should recurse - * @deprecated As of release 0.6.0, replaced by - * {@link #deleteFilesInDirectory(File, FilenameFilter, Logger, boolean)} - */ - @Deprecated - public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) { - FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false); - } - - /** - * Deletes all files (not directories) in the given directory (recursive) - * that match the given filename filter. If any file cannot be deleted then - * this is printed at warn to the given logger. - * - * @param directory to delete contents of - * @param filter if null then no filter is used - * @param logger to notify - * @param recurse will look for contents of sub directories. - * @param deleteEmptyDirectories default is false; if true will delete - * directories found that are empty - * @deprecated As of release 0.6.0, replaced by - * {@link #deleteFilesInDirectory(File, FilenameFilter, Logger, boolean, boolean)} - */ - @Deprecated - public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) { - // ensure the specified directory is actually a directory and that it exists - if (null != directory && directory.isDirectory()) { - final File ingestFiles[] = directory.listFiles(); - if (ingestFiles == null) { - // null if abstract pathname does not denote a directory, or if an I/O error occurs - logger.error("Unable to list directory content in: " + directory.getAbsolutePath()); - } - for (File ingestFile : ingestFiles) { - boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName()); - if (ingestFile.isFile() && process) { - FileUtils.deleteFile(ingestFile, logger, 3); - } - if (ingestFile.isDirectory() && recurse) { - FileUtils.deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories); - if (deleteEmptyDirectories && ingestFile.list().length == 0) { - FileUtils.deleteFile(ingestFile, logger, 3); - } - } - } - } - } - - /** - * Deletes all files (not directories..) in the given directory (non - * recursive) that match the given filename filter. If any file cannot be - * deleted then this is printed at warn to the given logger. - * - * @param directory to delete contents of - * @param filter if null then no filter is used - * @param logger to notify - * @throws IOException if abstract pathname does not denote a directory, - * or if an I/O error occurs + * @throws IOException if abstract pathname does not denote a directory, or + * if an I/O error occurs */ public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter, final Logger logger) throws IOException { FileUtils.deleteFilesInDirectory(directory, filter, logger, false); @@ -190,8 +139,8 @@ public class FileUtils { * @param filter if null then no filter is used * @param logger to notify * @param recurse true if should recurse - * @throws IOException if abstract pathname does not denote a directory, - * or if an I/O error occurs + * @throws IOException if abstract pathname does not denote a directory, or + * if an I/O error occurs */ public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) throws IOException { FileUtils.deleteFilesInDirectory(directory, filter, logger, recurse, false); @@ -208,8 +157,8 @@ public class FileUtils { * @param recurse will look for contents of sub directories. * @param deleteEmptyDirectories default is false; if true will delete * directories found that are empty - * @throws IOException if abstract pathname does not denote a directory, - * or if an I/O error occurs + * @throws IOException if abstract pathname does not denote a directory, or + * if an I/O error occurs */ public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) throws IOException { // ensure the specified directory is actually a directory and that it exists @@ -265,4 +214,4 @@ public class FileUtils { /* do nothing */ } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml index dca11b0..94108c4 100644 --- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml @@ -51,5 +51,10 @@ limitations under the License. <artifactId>nifi-persistent-provenance-repository</artifactId> <scope>compile</scope> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>provided</scope> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java index a015dca..5ce83a6 100644 --- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java @@ -16,7 +16,29 @@ */ package org.apache.nifi.provenance; -import static org.apache.nifi.provenance.toc.TocUtil.getTocFile; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.provenance.expiration.ExpirationAction; +import org.apache.nifi.provenance.expiration.FileRemovalAction; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.provenance.serialization.RecordReader; +import org.apache.nifi.provenance.serialization.RecordReaders; +import org.apache.nifi.provenance.serialization.RecordWriter; +import org.apache.nifi.provenance.serialization.RecordWriters; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.provenance.toc.TocUtil; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.EOFException; import java.io.File; @@ -54,29 +76,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.authorization.Authorizer; -import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.provenance.expiration.ExpirationAction; -import org.apache.nifi.provenance.expiration.FileRemovalAction; -import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; -import org.apache.nifi.provenance.search.Query; -import org.apache.nifi.provenance.search.QuerySubmission; -import org.apache.nifi.provenance.search.SearchableField; -import org.apache.nifi.provenance.serialization.RecordReader; -import org.apache.nifi.provenance.serialization.RecordReaders; -import org.apache.nifi.provenance.serialization.RecordWriter; -import org.apache.nifi.provenance.serialization.RecordWriters; -import org.apache.nifi.provenance.toc.TocReader; -import org.apache.nifi.provenance.toc.TocUtil; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.StopWatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.nifi.provenance.toc.TocUtil.getTocFile; // TODO: When API, FlowController, and supporting classes are refactored/reimplemented migrate this class and its accompanying imports to minifi package structure @@ -320,7 +320,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceRepositor final File journalDirectory = new File(storageDirectory, "journals"); final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i); - writers[i] = RecordWriters.newRecordWriter(journalFile, false, false); + writers[i] = RecordWriters.newSchemaRecordWriter(journalFile, false, false); writers[i].writeHeader(initialRecordId); } @@ -1367,7 +1367,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceRepositor // loop over each entry in the map, persisting the records to the merged file in order, and populating the map // with the next entry from the journal file from which the previous record was written. - try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) { + try (final RecordWriter writer = RecordWriters.newSchemaRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) { writer.writeHeader(minEventId); while (!recordToReaderMap.isEmpty()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java index fbeeeb6..0dd5f65 100644 --- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java @@ -16,30 +16,6 @@ */ package org.apache.nifi.provenance; -import static org.apache.nifi.provenance.TestUtil.createFlowFile; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.zip.GZIPOutputStream; - import org.apache.lucene.queryparser.classic.ParseException; import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFile; @@ -57,9 +33,28 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.zip.GZIPOutputStream; + +import static org.apache.nifi.provenance.TestUtil.createFlowFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; public class TestMiNiFiPersistentProvenanceRepository { @@ -558,113 +553,6 @@ public class TestMiNiFiPersistentProvenanceRepository { assertEquals(0, reportedEvents.size()); } - - @Test - public void testBehaviorOnOutOfMemory() throws IOException, InterruptedException { - final RepositoryConfiguration config = createConfiguration(); - config.setMaxEventFileLife(3, TimeUnit.MINUTES); - config.setJournalCount(4); - - // Create a repository that overrides the createWriters() method so that we can return writers that will throw - // OutOfMemoryError where we want to - final AtomicBoolean causeOOME = new AtomicBoolean(false); - repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { - @Override - protected RecordWriter[] createWriters(RepositoryConfiguration config, long initialRecordId) throws IOException { - final RecordWriter[] recordWriters = super.createWriters(config, initialRecordId); - - // Spy on each of the writers so that a call to writeUUID throws an OutOfMemoryError if we set the - // causeOOME flag to true - final StandardRecordWriter[] spiedWriters = new StandardRecordWriter[recordWriters.length]; - for (int i = 0; i < recordWriters.length; i++) { - final StandardRecordWriter writer = (StandardRecordWriter) recordWriters[i]; - - spiedWriters[i] = Mockito.spy(writer); - Mockito.doAnswer(new Answer<Object>() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - if (causeOOME.get()) { - throw new OutOfMemoryError(); - } else { - writer.writeUUID(invocation.getArgumentAt(0, DataOutputStream.class), invocation.getArgumentAt(1, String.class)); - } - return null; - } - }).when(spiedWriters[i]).writeUUID(Mockito.any(DataOutputStream.class), Mockito.any(String.class)); - } - - // return the writers that we are spying on - return spiedWriters; - } - }; - repo.initialize(getEventReporter(), null, null); - - final Map<String, String> attributes = new HashMap<>(); - attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); - - final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); - builder.setEventTime(System.currentTimeMillis()); - builder.setEventType(ProvenanceEventType.RECEIVE); - builder.setTransitUri("nifi://unit-test"); - attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); - builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); - builder.setComponentId("1234"); - builder.setComponentType("dummy processor"); - - // first make sure that we are able to write to the repo successfully. - for (int i = 0; i < 4; i++) { - final ProvenanceEventRecord record = builder.build(); - repo.registerEvent(record); - } - - // cause OOME to occur - causeOOME.set(true); - - // write 4 times to make sure that we mark all partitions as dirty - for (int i = 0; i < 4; i++) { - final ProvenanceEventRecord record = builder.build(); - try { - repo.registerEvent(record); - Assert.fail("Expected OutOfMmeoryError but was able to register event"); - } catch (final OutOfMemoryError oome) { - } - } - - // now that all partitions are dirty, ensure that as we keep trying to write, we get an IllegalStateException - // and that we don't corrupt the repository by writing partial records - for (int i = 0; i < 8; i++) { - final ProvenanceEventRecord record = builder.build(); - try { - repo.registerEvent(record); - Assert.fail("Expected OutOfMmeoryError but was able to register event"); - } catch (final IllegalStateException ise) { - } - } - - // close repo so that we can create a new one to recover records - repo.close(); - - // make sure we can recover - final MiNiFiPersistentProvenanceRepository recoveryRepo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { - @Override - protected Set<File> recoverJournalFiles() throws IOException { - try { - return super.recoverJournalFiles(); - } catch (final IOException ioe) { - Assert.fail("Failed to recover properly"); - return null; - } - } - }; - - try { - recoveryRepo.initialize(getEventReporter(), null, null); - } finally { - recoveryRepo.close(); - } - } - - private static class ReportedEvent { private final Severity severity; private final String category; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5678666..27075db 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ limitations under the License. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <inceptionYear>2016</inceptionYear> <org.slf4j.version>1.7.12</org.slf4j.version> - <org.apache.nifi.version>1.0.0</org.apache.nifi.version> + <org.apache.nifi.version>1.1.0</org.apache.nifi.version> <logback.version>1.1.7</logback.version> <jetty.version>9.3.9.v20160517</jetty.version> <jersey.version>1.19</jersey.version>