Repository: nifi-minifi
Updated Branches:
  refs/heads/master bcf6c6cb2 -> 967d40243


MINIFI-142 Upgrading NiFi dependencies to 1.1.0 and adjusting NAR and 
ClassLoader code to reflect changes introduced in NiFi.

Signed-off-by: Joseph Percivall <jperciv...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/85d69ead
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/85d69ead
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/85d69ead

Branch: refs/heads/master
Commit: 85d69eadae5bffb570a28fcb4a05649d58605ce6
Parents: bcf6c6c
Author: Aldrin Piri <ald...@apache.org>
Authored: Tue Nov 29 16:36:46 2016 -0500
Committer: Joseph Percivall <jperciv...@apache.org>
Committed: Wed Nov 30 17:13:13 2016 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/nar/ExtensionManager.java   | 103 ++++++++++--
 .../apache/nifi/nar/InstanceClassLoader.java    | 160 +++++++++++++++++++
 .../java/org/apache/nifi/nar/NarCloseable.java  |  27 ++--
 .../nifi/nar/NarThreadContextClassLoader.java   |  22 ++-
 .../java/org/apache/nifi/nar/NarUnpacker.java   |  28 ++--
 .../java/org/apache/nifi/util/FileUtils.java    | 105 ++++--------
 .../pom.xml                                     |   5 +
 .../MiNiFiPersistentProvenanceRepository.java   |  52 +++---
 ...estMiNiFiPersistentProvenanceRepository.java | 156 +++---------------
 pom.xml                                         |   2 +-
 10 files changed, 385 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
----------------------------------------------------------------------
diff --git 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
index 787fb3c..9fd9e66 100644
--- 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
+++ 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java
@@ -16,14 +16,8 @@
  */
 package org.apache.nifi.nar;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.Set;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.authentication.LoginIdentityProvider;
-
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.repository.ContentRepository;
@@ -34,10 +28,21 @@ import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.provenance.ProvenanceRepository;
 import org.apache.nifi.reporting.ReportingTask;
-
+import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * Scans through the classpath to load all FlowFileProcessors, 
FlowFileComparators, and ReportingTasks using the service provider API and 
running through all classloaders (root, NARs).
  *
@@ -53,6 +58,9 @@ public class ExtensionManager {
 
     private static final Map<String, ClassLoader> extensionClassloaderLookup = 
new HashMap<>();
 
+    private static final Set<String> requiresInstanceClassLoading = new 
HashSet<>();
+    private static final Map<String, ClassLoader> instanceClassloaderLookup = 
new ConcurrentHashMap<>();
+
     static {
         definitionMap.put(Processor.class, new HashSet<>());
         definitionMap.put(FlowFilePrioritizer.class, new HashSet<>());
@@ -127,6 +135,12 @@ public class ExtensionManager {
         if (registeredClassLoader == null) {
             classloaderMap.put(className, classLoader);
             classes.add(type);
+
+            // keep track of which classes require a class loader per 
component instance
+            if (type.isAnnotationPresent(RequiresInstanceClassLoading.class)) {
+                requiresInstanceClassLoading.add(className);
+            }
+
         } else {
             boolean loadedFromAncestor = false;
 
@@ -159,6 +173,77 @@ public class ExtensionManager {
         return extensionClassloaderLookup.get(classType);
     }
 
+    /**
+     * Determines the effective ClassLoader for the instance of the given type.
+     *
+     * @param classType the type of class to lookup the ClassLoader for
+     * @param instanceIdentifier the identifier of the specific instance of 
the classType to look up the ClassLoader for
+     * @return the ClassLoader for the given instance of the given type, or 
null if the type is not a detected extension type
+     */
+    public static ClassLoader getClassLoader(final String classType, final 
String instanceIdentifier) {
+        if (StringUtils.isEmpty(classType) || 
StringUtils.isEmpty(instanceIdentifier)) {
+            throw new IllegalArgumentException("Class Type and Instance 
Identifier must be provided");
+        }
+
+        // Check if we already have a ClassLoader for this instance
+        ClassLoader instanceClassLoader = 
instanceClassloaderLookup.get(instanceIdentifier);
+
+        // If we don't then we'll create a new ClassLoader for this instance 
and add it to the map for future lookups
+        if (instanceClassLoader == null) {
+            final ClassLoader registeredClassLoader = 
getClassLoader(classType);
+            if (registeredClassLoader == null) {
+                return null;
+            }
+
+            // If the class is annotated with @RequiresInstanceClassLoading 
and the registered ClassLoader is a URLClassLoader
+            // then make a new InstanceClassLoader that is a full copy of the 
NAR Class Loader, otherwise create an empty
+            // InstanceClassLoader that has the NAR ClassLoader as a parent
+            if (requiresInstanceClassLoading.contains(classType) && 
(registeredClassLoader instanceof URLClassLoader)) {
+                final URLClassLoader registeredUrlClassLoader = 
(URLClassLoader) registeredClassLoader;
+                instanceClassLoader = new 
InstanceClassLoader(instanceIdentifier, classType, 
registeredUrlClassLoader.getURLs(), registeredUrlClassLoader.getParent());
+            } else {
+                instanceClassLoader = new 
InstanceClassLoader(instanceIdentifier, classType, new URL[0], 
registeredClassLoader);
+            }
+
+            instanceClassloaderLookup.put(instanceIdentifier, 
instanceClassLoader);
+        }
+
+        return instanceClassLoader;
+    }
+
+    /**
+     * Removes the ClassLoader for the given instance and closes it if 
necessary.
+     *
+     * @param instanceIdentifier the identifier of a component to remove the 
ClassLoader for
+     * @return the removed ClassLoader for the given instance, or null if not 
found
+     */
+    public static ClassLoader removeInstanceClassLoaderIfExists(final String 
instanceIdentifier) {
+        if (instanceIdentifier == null) {
+            return null;
+        }
+
+        final ClassLoader classLoader = 
instanceClassloaderLookup.remove(instanceIdentifier);
+        if (classLoader != null && (classLoader instanceof URLClassLoader)) {
+            final URLClassLoader urlClassLoader = (URLClassLoader) classLoader;
+            try {
+                urlClassLoader.close();
+            } catch (IOException e) {
+                logger.warn("Unable to class URLClassLoader for " + 
instanceIdentifier);
+            }
+        }
+        return classLoader;
+    }
+
+    /**
+     * Checks if the given class type requires per-instance class loading 
(i.e. contains the @RequiresInstanceClassLoading annotation)
+     *
+     * @param classType the class to check
+     * @return true if the class is found in the set of classes requiring 
instance level class loading, false otherwise
+     */
+    public static boolean requiresInstanceClassLoading(final String classType) 
{
+        return requiresInstanceClassLoading.contains(classType);
+    }
+
     public static Set<Class> getExtensions(final Class<?> definition) {
         final Set<Class> extensions = definitionMap.get(definition);
         return (extensions == null) ? Collections.<Class>emptySet() : 
extensions;
@@ -180,4 +265,4 @@ public class ExtensionManager {
 
         logger.info(builder.toString());
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java
----------------------------------------------------------------------
diff --git 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java
 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java
new file mode 100644
index 0000000..8aff08f
--- /dev/null
+++ 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * A ClassLoader created for an instance of a component which lets a client 
add resources to an intermediary ClassLoader
+ * that will be checked first when loading/finding classes.
+ *
+ * Typically an instance of this ClassLoader will be created by passing in the 
URLs and parent from a NARClassLoader in
+ * order to create a copy of the NARClassLoader without modifying it.
+ */
+public class InstanceClassLoader extends URLClassLoader {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(InstanceClassLoader.class);
+
+    private final String identifier;
+    private final String instanceType;
+    private ShimClassLoader shimClassLoader;
+
+    /**
+     * @param identifier the id of the component this ClassLoader was created 
for
+     * @param urls the URLs for the ClassLoader
+     * @param parent the parent ClassLoader
+     */
+    public InstanceClassLoader(final String identifier, final String type, 
final URL[] urls, final ClassLoader parent) {
+        super(urls, parent);
+        this.identifier = identifier;
+        this.instanceType = type;
+    }
+
+    /**
+     * Initializes a new ShimClassLoader for the provided resources, closing 
the previous ShimClassLoader if one existed.
+     *
+     * @param urls the URLs for the ShimClassLoader
+     * @throws IOException if the previous ShimClassLoader existed and 
couldn't be closed
+     */
+    public synchronized void setInstanceResources(final URL[] urls) {
+        if (shimClassLoader != null) {
+            try {
+                shimClassLoader.close();
+            } catch (IOException e) {
+                logger.warn("Unable to close inner URLClassLoader for " + 
identifier);
+            }
+        }
+
+        shimClassLoader = new ShimClassLoader(urls, getParent());
+    }
+
+    /**
+     * @return the URLs for the instance resources that have been set
+     */
+    public synchronized URL[] getInstanceResources() {
+        if (shimClassLoader != null) {
+            return shimClassLoader.getURLs();
+        }
+        return new URL[0];
+    }
+
+    @Override
+    public Class<?> loadClass(String name) throws ClassNotFoundException {
+        return this.loadClass(name, false);
+    }
+
+    @Override
+    protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+        Class<?> c = null;
+        // first try the shim
+        if (shimClassLoader != null) {
+            try {
+                c = shimClassLoader.loadClass(name, resolve);
+            } catch (ClassNotFoundException e) {
+                c = null;
+            }
+        }
+        // if it wasn't in the shim try our self
+        if (c == null) {
+            return super.loadClass(name, resolve);
+        } else {
+            return c;
+        }
+    }
+
+    @Override
+    protected Class<?> findClass(String name) throws ClassNotFoundException {
+        Class<?> c = null;
+        // first try the shim
+        if (shimClassLoader != null) {
+            try {
+                c = shimClassLoader.findClass(name);
+            } catch (ClassNotFoundException cnf) {
+                c = null;
+            }
+        }
+        // if it wasn't in the shim try our self
+        if (c == null) {
+            return super.findClass(name);
+        } else {
+            return c;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (shimClassLoader != null) {
+            try {
+                shimClassLoader.close();
+            } catch (IOException e) {
+                logger.warn("Unable to close inner URLClassLoader for " + 
identifier);
+            }
+        }
+        super.close();
+    }
+
+    /**
+     * Extend URLClassLoader to increase visibility of protected methods so 
that InstanceClassLoader can delegate.
+     */
+    private static class ShimClassLoader extends URLClassLoader {
+
+        public ShimClassLoader(URL[] urls, ClassLoader parent) {
+            super(urls, parent);
+        }
+
+        public ShimClassLoader(URL[] urls) {
+            super(urls);
+        }
+
+        @Override
+        public Class<?> findClass(String name) throws ClassNotFoundException {
+            return super.findClass(name);
+        }
+
+        @Override
+        public Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+            return super.loadClass(name, resolve);
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
----------------------------------------------------------------------
diff --git 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
index 639d032..56aff9e 100644
--- 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
+++ 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
@@ -35,18 +35,25 @@ public class NarCloseable implements Closeable {
     }
 
     /**
-     * Sets the current thread context class loader to the specific appropriate
-     * Nar class loader for the given configurable component. Restores to the
-     * previous classloader once complete. If the given class is not assignable
-     * from ConfigurableComponent then the NarThreadContextClassLoader is used.
+     * Sets the current thread context class loader to the specific 
appropriate class loader for the given
+     * component. If the component requires per-instance class loading then 
the class loader will be the
+     * specific class loader for instance with the given identifier, otherwise 
the class loader will be
+     * the NARClassLoader.
      *
-     * @param componentClass componentClass
-     * @return NarCloseable with current thread context classloader jailed to
-     * the nar of the component
+     * @param componentClass the component class
+     * @param componentIdentifier the identifier of the component
+     * @return NarCloseable with the current thread context classloader jailed 
to the Nar
+     *              or instance class loader of the component
      */
-    public static NarCloseable withComponentNarLoader(final Class 
componentClass) {
+    public static NarCloseable withComponentNarLoader(final Class 
componentClass, final String componentIdentifier) {
         final ClassLoader current = 
Thread.currentThread().getContextClassLoader();
-        
Thread.currentThread().setContextClassLoader(componentClass.getClassLoader());
+
+        ClassLoader componentClassLoader = 
ExtensionManager.getClassLoader(componentClass.getName(), componentIdentifier);
+        if (componentClassLoader == null) {
+            componentClassLoader = componentClass.getClassLoader();
+        }
+
+        Thread.currentThread().setContextClassLoader(componentClassLoader);
         return new NarCloseable(current);
     }
 
@@ -88,4 +95,4 @@ public class NarCloseable implements Closeable {
             Thread.currentThread().setContextClassLoader(toSet);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
----------------------------------------------------------------------
diff --git 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
index 5c70927..6a66ba2 100644
--- 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
+++ 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
@@ -203,14 +203,28 @@ public class NarThreadContextClassLoader extends 
URLClassLoader {
                 return typeDefinition.cast(desiredClass.newInstance());
             }
             Constructor<?> constructor = null;
+
             try {
                 constructor = 
desiredClass.getConstructor(NiFiProperties.class);
-                return 
typeDefinition.cast(constructor.newInstance(nifiProperties));
-            } catch (final NoSuchMethodException | InvocationTargetException 
ex) {
-                return typeDefinition.cast(desiredClass.newInstance());
+            } catch (NoSuchMethodException nsme) {
+                try {
+                    constructor = desiredClass.getConstructor();
+                } catch (NoSuchMethodException nsme2) {
+                    throw new IllegalStateException("Failed to find 
constructor which takes NiFiProperties as argument as well as the default 
constructor on "
+                            + desiredClass.getName(), nsme2);
+                }
+            }
+            try {
+                if (constructor.getParameterTypes().length == 0) {
+                    return typeDefinition.cast(constructor.newInstance());
+                } else {
+                    return 
typeDefinition.cast(constructor.newInstance(nifiProperties));
+                }
+            } catch (InvocationTargetException ite) {
+                throw new IllegalStateException("Failed to instantiate a 
component due to (see target exception)", ite);
             }
         } finally {
             Thread.currentThread().setContextClassLoader(originalClassLoader);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
----------------------------------------------------------------------
diff --git 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
index 2af1090..13108c1 100644
--- 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
+++ 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
@@ -50,7 +50,7 @@ import java.util.jar.Manifest;
  */
 public final class NarUnpacker {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(org.apache.nifi.nar.NarUnpacker.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(NarUnpacker.class);
     private static String HASH_FILENAME = "nar-md5sum";
     private static final FileFilter NAR_FILTER = new FileFilter() {
         @Override
@@ -72,14 +72,16 @@ public final class NarUnpacker {
             final List<File> narFiles = new ArrayList<>();
 
             // make sure the nar directories are there and accessible
-            FileUtils.ensureDirectoryExistAndCanAccess(frameworkWorkingDir);
-            FileUtils.ensureDirectoryExistAndCanAccess(extensionsWorkingDir);
-            FileUtils.ensureDirectoryExistAndCanAccess(docsWorkingDir);
+            
FileUtils.ensureDirectoryExistAndCanReadAndWrite(frameworkWorkingDir);
+            
FileUtils.ensureDirectoryExistAndCanReadAndWrite(extensionsWorkingDir);
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(docsWorkingDir);
 
             for (Path narLibraryDir : narLibraryDirs) {
 
                 File narDir = narLibraryDir.toFile();
-                FileUtils.ensureDirectoryExistAndCanAccess(narDir);
+
+                // Test if the source NARs can be read
+                FileUtils.ensureDirectoryExistAndCanRead(narDir);
 
                 File[] dirFiles = narDir.listFiles(NAR_FILTER);
                 if (dirFiles != null) {
@@ -170,7 +172,7 @@ public final class NarUnpacker {
     }
 
     private static void mapExtensions(final File workingDirectory, final File 
docsDirectory,
-            final ExtensionMapping mapping) throws IOException {
+                                      final ExtensionMapping mapping) throws 
IOException {
         final File[] directoryContents = workingDirectory.listFiles();
         if (directoryContents != null) {
             for (final File file : directoryContents) {
@@ -256,7 +258,7 @@ public final class NarUnpacker {
     }
 
     private static void unpackDocumentation(final File jar, final File 
docsDirectory,
-            final ExtensionMapping extensionMapping) throws IOException {
+                                            final ExtensionMapping 
extensionMapping) throws IOException {
         // determine the components that may have documentation
         determineDocumentedNiFiComponents(jar, extensionMapping);
 
@@ -298,7 +300,7 @@ public final class NarUnpacker {
     }
 
     private static void determineDocumentedNiFiComponents(final File jar,
-            final ExtensionMapping extensionMapping) throws IOException {
+                                                          final 
ExtensionMapping extensionMapping) throws IOException {
         try (final JarFile jarFile = new JarFile(jar)) {
             final JarEntry processorEntry = jarFile
                     
.getJarEntry("META-INF/services/org.apache.nifi.processor.Processor");
@@ -317,7 +319,7 @@ public final class NarUnpacker {
     }
 
     private static List<String> determineDocumentedNiFiComponents(final 
JarFile jarFile,
-            final JarEntry jarEntry) throws IOException {
+                                                                  final 
JarEntry jarEntry) throws IOException {
         final List<String> componentNames = new ArrayList<>();
 
         if (jarEntry == null) {
@@ -325,8 +327,8 @@ public final class NarUnpacker {
         }
 
         try (final InputStream entryInputStream = 
jarFile.getInputStream(jarEntry);
-                final BufferedReader reader = new BufferedReader(new 
InputStreamReader(
-                        entryInputStream))) {
+             final BufferedReader reader = new BufferedReader(new 
InputStreamReader(
+                     entryInputStream))) {
             String line;
             while ((line = reader.readLine()) != null) {
                 final String trimmedLine = line.trim();
@@ -355,7 +357,7 @@ public final class NarUnpacker {
      */
     private static void makeFile(final InputStream inputStream, final File 
file) throws IOException {
         try (final InputStream in = inputStream;
-                final FileOutputStream fos = new FileOutputStream(file)) {
+             final FileOutputStream fos = new FileOutputStream(file)) {
             byte[] bytes = new byte[65536];
             int numRead;
             while ((numRead = in.read(bytes)) != -1) {
@@ -393,4 +395,4 @@ public final class NarUnpacker {
 
     private NarUnpacker() {
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java
----------------------------------------------------------------------
diff --git 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java
 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java
index 5462f23..5e8a3c3 100644
--- 
a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java
+++ 
b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java
@@ -33,7 +33,13 @@ public class FileUtils {
 
     public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
 
+    /* Superseded by renamed class bellow */
+    @Deprecated
     public static void ensureDirectoryExistAndCanAccess(final File dir) throws 
IOException {
+        ensureDirectoryExistAndCanReadAndWrite(dir);
+    }
+
+    public static void ensureDirectoryExistAndCanReadAndWrite(final File dir) 
throws IOException {
         if (dir.exists() && !dir.isDirectory()) {
             throw new IOException(dir.getAbsolutePath() + " is not a 
directory");
         } else if (!dir.exists()) {
@@ -47,6 +53,20 @@ public class FileUtils {
         }
     }
 
+    public static void ensureDirectoryExistAndCanRead(final File dir) throws 
IOException {
+        if (dir.exists() && !dir.isDirectory()) {
+            throw new IOException(dir.getAbsolutePath() + " is not a 
directory");
+        } else if (!dir.exists()) {
+            final boolean made = dir.mkdirs();
+            if (!made) {
+                throw new IOException(dir.getAbsolutePath() + " could not be 
created");
+            }
+        }
+        if (!dir.canRead()) {
+            throw new IOException(dir.getAbsolutePath() + " directory does not 
have read privilege");
+        }
+    }
+
     /**
      * Deletes the given file. If the given file exists but could not be 
deleted
      * this will be printed as a warning to the given logger
@@ -103,79 +123,8 @@ public class FileUtils {
      * @param directory to delete contents of
      * @param filter if null then no filter is used
      * @param logger to notify
-     * @deprecated As of release 0.6.0, replaced by
-     * {@link #deleteFilesInDirectory(File, FilenameFilter, Logger)}
-     */
-    @Deprecated
-    public static void deleteFilesInDir(final File directory, final 
FilenameFilter filter, final Logger logger) {
-        FileUtils.deleteFilesInDir(directory, filter, logger, false);
-    }
-
-    /**
-     * Deletes all files (not directories) in the given directory (recursive)
-     * that match the given filename filter. If any file cannot be deleted then
-     * this is printed at warn to the given logger.
-     *
-     * @param directory to delete contents of
-     * @param filter if null then no filter is used
-     * @param logger to notify
-     * @param recurse true if should recurse
-     * @deprecated As of release 0.6.0, replaced by
-     * {@link #deleteFilesInDirectory(File, FilenameFilter, Logger, boolean)}
-     */
-    @Deprecated
-    public static void deleteFilesInDir(final File directory, final 
FilenameFilter filter, final Logger logger, final boolean recurse) {
-        FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false);
-    }
-
-    /**
-     * Deletes all files (not directories) in the given directory (recursive)
-     * that match the given filename filter. If any file cannot be deleted then
-     * this is printed at warn to the given logger.
-     *
-     * @param directory to delete contents of
-     * @param filter if null then no filter is used
-     * @param logger to notify
-     * @param recurse will look for contents of sub directories.
-     * @param deleteEmptyDirectories default is false; if true will delete
-     * directories found that are empty
-     * @deprecated As of release 0.6.0, replaced by
-     * {@link #deleteFilesInDirectory(File, FilenameFilter, Logger, boolean, 
boolean)}
-     */
-    @Deprecated
-    public static void deleteFilesInDir(final File directory, final 
FilenameFilter filter, final Logger logger, final boolean recurse, final 
boolean deleteEmptyDirectories) {
-        // ensure the specified directory is actually a directory and that it 
exists
-        if (null != directory && directory.isDirectory()) {
-            final File ingestFiles[] = directory.listFiles();
-            if (ingestFiles == null) {
-                // null if abstract pathname does not denote a directory, or 
if an I/O error occurs
-                logger.error("Unable to list directory content in: " + 
directory.getAbsolutePath());
-            }
-            for (File ingestFile : ingestFiles) {
-                boolean process = (filter == null) ? true : 
filter.accept(directory, ingestFile.getName());
-                if (ingestFile.isFile() && process) {
-                    FileUtils.deleteFile(ingestFile, logger, 3);
-                }
-                if (ingestFile.isDirectory() && recurse) {
-                    FileUtils.deleteFilesInDir(ingestFile, filter, logger, 
recurse, deleteEmptyDirectories);
-                    if (deleteEmptyDirectories && ingestFile.list().length == 
0) {
-                        FileUtils.deleteFile(ingestFile, logger, 3);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Deletes all files (not directories..) in the given directory (non
-     * recursive) that match the given filename filter. If any file cannot be
-     * deleted then this is printed at warn to the given logger.
-     *
-     * @param directory to delete contents of
-     * @param filter if null then no filter is used
-     * @param logger to notify
-     * @throws IOException if abstract pathname does not denote a directory,
-     * or if an I/O error occurs
+     * @throws IOException if abstract pathname does not denote a directory, or
+     * if an I/O error occurs
      */
     public static void deleteFilesInDirectory(final File directory, final 
FilenameFilter filter, final Logger logger) throws IOException {
         FileUtils.deleteFilesInDirectory(directory, filter, logger, false);
@@ -190,8 +139,8 @@ public class FileUtils {
      * @param filter if null then no filter is used
      * @param logger to notify
      * @param recurse true if should recurse
-     * @throws IOException if abstract pathname does not denote a directory,
-     * or if an I/O error occurs
+     * @throws IOException if abstract pathname does not denote a directory, or
+     * if an I/O error occurs
      */
     public static void deleteFilesInDirectory(final File directory, final 
FilenameFilter filter, final Logger logger, final boolean recurse) throws 
IOException {
         FileUtils.deleteFilesInDirectory(directory, filter, logger, recurse, 
false);
@@ -208,8 +157,8 @@ public class FileUtils {
      * @param recurse will look for contents of sub directories.
      * @param deleteEmptyDirectories default is false; if true will delete
      * directories found that are empty
-     * @throws IOException if abstract pathname does not denote a directory,
-     * or if an I/O error occurs
+     * @throws IOException if abstract pathname does not denote a directory, or
+     * if an I/O error occurs
      */
     public static void deleteFilesInDirectory(final File directory, final 
FilenameFilter filter, final Logger logger, final boolean recurse, final 
boolean deleteEmptyDirectories) throws IOException {
         // ensure the specified directory is actually a directory and that it 
exists
@@ -265,4 +214,4 @@ public class FileUtils {
             /* do nothing */
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml
----------------------------------------------------------------------
diff --git 
a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml
 
b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml
index dca11b0..94108c4 100644
--- 
a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml
+++ 
b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml
@@ -51,5 +51,10 @@ limitations under the License.
             <artifactId>nifi-persistent-provenance-repository</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java
 
b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java
index a015dca..5ce83a6 100644
--- 
a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java
+++ 
b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java
@@ -16,7 +16,29 @@
  */
 package org.apache.nifi.provenance;
 
-import static org.apache.nifi.provenance.toc.TocUtil.getTocFile;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.provenance.expiration.ExpirationAction;
+import org.apache.nifi.provenance.expiration.FileRemovalAction;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordReaders;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.serialization.RecordWriters;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
 import java.io.File;
@@ -54,29 +76,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.authorization.Authorizer;
-import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.provenance.expiration.ExpirationAction;
-import org.apache.nifi.provenance.expiration.FileRemovalAction;
-import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
-import org.apache.nifi.provenance.search.Query;
-import org.apache.nifi.provenance.search.QuerySubmission;
-import org.apache.nifi.provenance.search.SearchableField;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-import org.apache.nifi.provenance.serialization.RecordWriter;
-import org.apache.nifi.provenance.serialization.RecordWriters;
-import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.nifi.provenance.toc.TocUtil;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.StopWatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.nifi.provenance.toc.TocUtil.getTocFile;
 
 
 // TODO: When API, FlowController, and supporting classes are 
refactored/reimplemented migrate this class and its accompanying imports to 
minifi package structure
@@ -320,7 +320,7 @@ public class MiNiFiPersistentProvenanceRepository 
implements ProvenanceRepositor
             final File journalDirectory = new File(storageDirectory, 
"journals");
             final File journalFile = new File(journalDirectory, 
String.valueOf(initialRecordId) + ".journal." + i);
 
-            writers[i] = RecordWriters.newRecordWriter(journalFile, false, 
false);
+            writers[i] = RecordWriters.newSchemaRecordWriter(journalFile, 
false, false);
             writers[i].writeHeader(initialRecordId);
         }
 
@@ -1367,7 +1367,7 @@ public class MiNiFiPersistentProvenanceRepository 
implements ProvenanceRepositor
 
             // loop over each entry in the map, persisting the records to the 
merged file in order, and populating the map
             // with the next entry from the journal file from which the 
previous record was written.
-            try (final RecordWriter writer = 
RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), 
true)) {
+            try (final RecordWriter writer = 
RecordWriters.newSchemaRecordWriter(writerFile, 
configuration.isCompressOnRollover(), true)) {
                 writer.writeHeader(minEventId);
 
                 while (!recordToReaderMap.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git 
a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java
 
b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java
index fbeeeb6..0dd5f65 100644
--- 
a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java
+++ 
b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java
@@ -16,30 +16,6 @@
  */
 package org.apache.nifi.provenance;
 
-import static org.apache.nifi.provenance.TestUtil.createFlowFile;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.zip.GZIPOutputStream;
-
 import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
@@ -57,9 +33,28 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.GZIPOutputStream;
+
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class TestMiNiFiPersistentProvenanceRepository {
 
@@ -558,113 +553,6 @@ public class TestMiNiFiPersistentProvenanceRepository {
         assertEquals(0, reportedEvents.size());
     }
 
-
-    @Test
-    public void testBehaviorOnOutOfMemory() throws IOException, 
InterruptedException {
-        final RepositoryConfiguration config = createConfiguration();
-        config.setMaxEventFileLife(3, TimeUnit.MINUTES);
-        config.setJournalCount(4);
-
-        // Create a repository that overrides the createWriters() method so 
that we can return writers that will throw
-        // OutOfMemoryError where we want to
-        final AtomicBoolean causeOOME = new AtomicBoolean(false);
-        repo = new MiNiFiPersistentProvenanceRepository(config, 
DEFAULT_ROLLOVER_MILLIS) {
-            @Override
-            protected RecordWriter[] createWriters(RepositoryConfiguration 
config, long initialRecordId) throws IOException {
-                final RecordWriter[] recordWriters = 
super.createWriters(config, initialRecordId);
-
-                // Spy on each of the writers so that a call to writeUUID 
throws an OutOfMemoryError if we set the
-                // causeOOME flag to true
-                final StandardRecordWriter[] spiedWriters = new 
StandardRecordWriter[recordWriters.length];
-                for (int i = 0; i < recordWriters.length; i++) {
-                    final StandardRecordWriter writer = (StandardRecordWriter) 
recordWriters[i];
-
-                    spiedWriters[i] = Mockito.spy(writer);
-                    Mockito.doAnswer(new Answer<Object>() {
-                        @Override
-                        public Object answer(final InvocationOnMock 
invocation) throws Throwable {
-                            if (causeOOME.get()) {
-                                throw new OutOfMemoryError();
-                            } else {
-                                writer.writeUUID(invocation.getArgumentAt(0, 
DataOutputStream.class), invocation.getArgumentAt(1, String.class));
-                            }
-                            return null;
-                        }
-                    
}).when(spiedWriters[i]).writeUUID(Mockito.any(DataOutputStream.class), 
Mockito.any(String.class));
-                }
-
-                // return the writers that we are spying on
-                return spiedWriters;
-            }
-        };
-        repo.initialize(getEventReporter(), null, null);
-
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("75chars", 
"123456789012345678901234567890123456789012345678901234567890123456789012345");
-
-        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
-        builder.setEventTime(System.currentTimeMillis());
-        builder.setEventType(ProvenanceEventType.RECEIVE);
-        builder.setTransitUri("nifi://unit-test");
-        attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
-        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
-        builder.setComponentId("1234");
-        builder.setComponentType("dummy processor");
-
-        // first make sure that we are able to write to the repo successfully.
-        for (int i = 0; i < 4; i++) {
-            final ProvenanceEventRecord record = builder.build();
-            repo.registerEvent(record);
-        }
-
-        // cause OOME to occur
-        causeOOME.set(true);
-
-        // write 4 times to make sure that we mark all partitions as dirty
-        for (int i = 0; i < 4; i++) {
-            final ProvenanceEventRecord record = builder.build();
-            try {
-                repo.registerEvent(record);
-                Assert.fail("Expected OutOfMmeoryError but was able to 
register event");
-            } catch (final OutOfMemoryError oome) {
-            }
-        }
-
-        // now that all partitions are dirty, ensure that as we keep trying to 
write, we get an IllegalStateException
-        // and that we don't corrupt the repository by writing partial records
-        for (int i = 0; i < 8; i++) {
-            final ProvenanceEventRecord record = builder.build();
-            try {
-                repo.registerEvent(record);
-                Assert.fail("Expected OutOfMmeoryError but was able to 
register event");
-            } catch (final IllegalStateException ise) {
-            }
-        }
-
-        // close repo so that we can create a new one to recover records
-        repo.close();
-
-        // make sure we can recover
-        final MiNiFiPersistentProvenanceRepository recoveryRepo = new 
MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
-            @Override
-            protected Set<File> recoverJournalFiles() throws IOException {
-                try {
-                    return super.recoverJournalFiles();
-                } catch (final IOException ioe) {
-                    Assert.fail("Failed to recover properly");
-                    return null;
-                }
-            }
-        };
-
-        try {
-            recoveryRepo.initialize(getEventReporter(), null, null);
-        } finally {
-            recoveryRepo.close();
-        }
-    }
-
-
     private static class ReportedEvent {
         private final Severity severity;
         private final String category;

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/85d69ead/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5678666..27075db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@ limitations under the License.
         
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <inceptionYear>2016</inceptionYear>
         <org.slf4j.version>1.7.12</org.slf4j.version>
-        <org.apache.nifi.version>1.0.0</org.apache.nifi.version>
+        <org.apache.nifi.version>1.1.0</org.apache.nifi.version>
         <logback.version>1.1.7</logback.version>
         <jetty.version>9.3.9.v20160517</jetty.version>
         <jersey.version>1.19</jersey.version>

Reply via email to