http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b5561877/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java index 9fd9e66..b80207b 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java @@ -19,14 +19,22 @@ package org.apache.nifi.nar; import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.authentication.LoginIdentityProvider; import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateProvider; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.init.ConfigurableComponentInitializer; +import org.apache.nifi.init.ConfigurableComponentInitializerFactory; import org.apache.nifi.processor.Processor; import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.util.StringUtils; import org.slf4j.Logger; @@ -35,13 +43,17 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** * Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs). @@ -56,10 +68,13 @@ public class ExtensionManager { // Maps a service definition (interface) to those classes that implement the interface private static final Map<Class, Set<Class>> definitionMap = new HashMap<>(); - private static final Map<String, ClassLoader> extensionClassloaderLookup = new HashMap<>(); + private static final Map<String, List<Bundle>> classNameBundleLookup = new HashMap<>(); + private static final Map<BundleCoordinate, Bundle> bundleCoordinateBundleLookup = new HashMap<>(); + private static final Map<ClassLoader, Bundle> classLoaderBundleLookup = new HashMap<>(); + private static final Map<String, ConfigurableComponent> tempComponentLookup = new HashMap<>(); - private static final Set<String> requiresInstanceClassLoading = new HashSet<>(); - private static final Map<String, ClassLoader> instanceClassloaderLookup = new ConcurrentHashMap<>(); + private static final Map<String, Class<?>> requiresInstanceClassLoading = new HashMap<>(); + private static final Map<String, InstanceClassLoader> instanceClassloaderLookup = new ConcurrentHashMap<>(); static { definitionMap.put(Processor.class, new HashSet<>()); @@ -73,28 +88,33 @@ public class ExtensionManager { definitionMap.put(FlowFileRepository.class, new HashSet<>()); definitionMap.put(FlowFileSwapManager.class, new HashSet<>()); definitionMap.put(ContentRepository.class, new HashSet<>()); + definitionMap.put(StateProvider.class, new HashSet<>()); } /** * Loads all FlowFileProcessor, FlowFileComparator, ReportingTask class types that can be found on the bootstrap classloader and by creating classloaders for all NARs found within the classpath. - * @param extensionLoaders the loaders to scan through in search of extensions + * + * @param narBundles the bundles to scan through in search of extensions */ - public static void discoverExtensions(final Set<ClassLoader> extensionLoaders) { - final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader(); - + public static void discoverExtensions(final Bundle systemBundle, final Set<Bundle> narBundles) { // get the current context class loader ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader(); - // consider the system class loader - loadExtensions(systemClassLoader); + // load the system bundle first so that any extensions found in JARs directly in lib will be registered as + // being from the system bundle and not from all the other NARs + loadExtensions(systemBundle); + bundleCoordinateBundleLookup.put(systemBundle.getBundleDetails().getCoordinate(), systemBundle); // consider each nar class loader - for (final ClassLoader ncl : extensionLoaders) { - + for (final Bundle bundle : narBundles) { // Must set the context class loader to the nar classloader itself // so that static initialization techniques that depend on the context class loader will work properly + final ClassLoader ncl = bundle.getClassLoader(); Thread.currentThread().setContextClassLoader(ncl); - loadExtensions(ncl); + loadExtensions(bundle); + + // Create a look-up from coordinate to bundle + bundleCoordinateBundleLookup.put(bundle.getBundleDetails().getCoordinate(), bundle); } // restore the current context class loader if appropriate @@ -104,160 +124,409 @@ public class ExtensionManager { } /** - * Loads extensions from the specified class loader. + * Loads extensions from the specified bundle. * - * @param classLoader from which to load extensions + * @param bundle from which to load extensions */ @SuppressWarnings("unchecked") - private static void loadExtensions(final ClassLoader classLoader) { + private static void loadExtensions(final Bundle bundle) { for (final Map.Entry<Class, Set<Class>> entry : definitionMap.entrySet()) { - final ServiceLoader<?> serviceLoader = ServiceLoader.load(entry.getKey(), classLoader); + final boolean isControllerService = ControllerService.class.equals(entry.getKey()); + final boolean isProcessor = Processor.class.equals(entry.getKey()); + final boolean isReportingTask = ReportingTask.class.equals(entry.getKey()); + final ServiceLoader<?> serviceLoader = ServiceLoader.load(entry.getKey(), bundle.getClassLoader()); for (final Object o : serviceLoader) { - registerServiceClass(o.getClass(), extensionClassloaderLookup, classLoader, entry.getValue()); + // create a cache of temp ConfigurableComponent instances, the initialize here has to happen before the checks below + if ((isControllerService || isProcessor || isReportingTask) && o instanceof ConfigurableComponent) { + final ConfigurableComponent configurableComponent = (ConfigurableComponent) o; + initializeTempComponent(configurableComponent); + + final String cacheKey = getClassBundleKey(o.getClass().getCanonicalName(), bundle.getBundleDetails().getCoordinate()); + tempComponentLookup.put(cacheKey, (ConfigurableComponent) o); + } + + // only consider extensions discovered directly in this bundle + boolean registerExtension = bundle.getClassLoader().equals(o.getClass().getClassLoader()); + + if (registerExtension) { + final Class extensionType = o.getClass(); + if (isControllerService && !checkControllerServiceEligibility(extensionType)) { + registerExtension = false; + logger.error(String.format( + "Skipping Controller Service %s because it is bundled with its supporting APIs and requires instance class loading.", extensionType.getName())); + } + + final boolean canReferenceControllerService = (isControllerService || isProcessor || isReportingTask) && o instanceof ConfigurableComponent; + if (canReferenceControllerService && !checkControllerServiceReferenceEligibility((ConfigurableComponent) o, bundle.getClassLoader())) { + registerExtension = false; + logger.error(String.format( + "Skipping component %s because it is bundled with its referenced Controller Service APIs and requires instance class loading.", extensionType.getName())); + } + + if (registerExtension) { + registerServiceClass(o.getClass(), classNameBundleLookup, bundle, entry.getValue()); + } + } + + } + + classLoaderBundleLookup.put(bundle.getClassLoader(), bundle); + } + } + + private static void initializeTempComponent(final ConfigurableComponent configurableComponent) { + ConfigurableComponentInitializer initializer = null; + try { + initializer = ConfigurableComponentInitializerFactory.createComponentInitializer(configurableComponent.getClass()); + initializer.initialize(configurableComponent); + } catch (final InitializationException e) { + logger.warn(String.format("Unable to initialize component %s due to %s", configurableComponent.getClass().getName(), e.getMessage())); + } + } + + private static boolean checkControllerServiceReferenceEligibility(final ConfigurableComponent component, final ClassLoader classLoader) { + // if the extension does not require instance classloading, its eligible + final boolean requiresInstanceClassLoading = component.getClass().isAnnotationPresent(RequiresInstanceClassLoading.class); + + final Set<Class> cobundledApis = new HashSet<>(); + try (final NarCloseable closeable = NarCloseable.withComponentNarLoader(component.getClass().getClassLoader())) { + final List<PropertyDescriptor> descriptors = component.getPropertyDescriptors(); + if (descriptors != null && !descriptors.isEmpty()) { + for (final PropertyDescriptor descriptor : descriptors) { + final Class<? extends ControllerService> serviceApi = descriptor.getControllerServiceDefinition(); + if (serviceApi != null && classLoader.equals(serviceApi.getClassLoader())) { + cobundledApis.add(serviceApi); + } + } + } + } + + if (!cobundledApis.isEmpty()) { + logger.warn(String.format( + "Component %s is bundled with its referenced Controller Service APIs %s. The service APIs should not be bundled with component implementations that reference it.", + component.getClass().getName(), StringUtils.join(cobundledApis.stream().map(cls -> cls.getName()).collect(Collectors.toSet()), ", "))); + } + + // the component is eligible when it does not require instance classloading or when the supporting APIs are bundled in a parent NAR + return requiresInstanceClassLoading == false || cobundledApis.isEmpty(); + } + + private static boolean checkControllerServiceEligibility(Class extensionType) { + final Class originalExtensionType = extensionType; + final ClassLoader originalExtensionClassLoader = extensionType.getClassLoader(); + + // if the extension does not require instance classloading, its eligible + final boolean requiresInstanceClassLoading = extensionType.isAnnotationPresent(RequiresInstanceClassLoading.class); + + final Set<Class> cobundledApis = new HashSet<>(); + while (extensionType != null) { + for (final Class i : extensionType.getInterfaces()) { + if (originalExtensionClassLoader.equals(i.getClassLoader())) { + cobundledApis.add(i); + } } + + extensionType = extensionType.getSuperclass(); + } + + if (!cobundledApis.isEmpty()) { + logger.warn(String.format("Controller Service %s is bundled with its supporting APIs %s. The service APIs should not be bundled with the implementations.", + originalExtensionType.getName(), StringUtils.join(cobundledApis.stream().map(cls -> cls.getName()).collect(Collectors.toSet()), ", "))); } + + // the service is eligible when it does not require instance classloading or when the supporting APIs are bundled in a parent NAR + return requiresInstanceClassLoading == false || cobundledApis.isEmpty(); } /** - * Registers extension for the specified type from the specified ClassLoader. + * Registers extension for the specified type from the specified Bundle. * - * @param type the extension type - * @param classloaderMap mapping of classname to classloader - * @param classLoader the classloader being mapped to - * @param classes to map to this classloader but which come from its ancestors + * @param type the extension type + * @param classNameBundleMap mapping of classname to Bundle + * @param bundle the Bundle being mapped to + * @param classes to map to this classloader but which come from its ancestors */ - private static void registerServiceClass(final Class<?> type, final Map<String, ClassLoader> classloaderMap, final ClassLoader classLoader, final Set<Class> classes) { + private static void registerServiceClass(final Class<?> type, final Map<String, List<Bundle>> classNameBundleMap, final Bundle bundle, final Set<Class> classes) { final String className = type.getName(); - final ClassLoader registeredClassLoader = classloaderMap.get(className); - // see if this class is already registered (this should happen when the class is loaded by an ancestor of the specified classloader) - if (registeredClassLoader == null) { - classloaderMap.put(className, classLoader); - classes.add(type); + // get the bundles that have already been registered for the class name + List<Bundle> registeredBundles = classNameBundleMap.get(className); - // keep track of which classes require a class loader per component instance - if (type.isAnnotationPresent(RequiresInstanceClassLoading.class)) { - requiresInstanceClassLoading.add(className); + if (registeredBundles == null) { + registeredBundles = new ArrayList<>(); + classNameBundleMap.put(className, registeredBundles); + } + + boolean alreadyRegistered = false; + for (final Bundle registeredBundle : registeredBundles) { + final BundleCoordinate registeredCoordinate = registeredBundle.getBundleDetails().getCoordinate(); + + // if the incoming bundle has the same coordinate as one of the registered bundles then consider it already registered + if (registeredCoordinate.equals(bundle.getBundleDetails().getCoordinate())) { + alreadyRegistered = true; + break; } - } else { - boolean loadedFromAncestor = false; - - // determine if this class was loaded from an ancestor - ClassLoader ancestorClassLoader = classLoader.getParent(); - while (ancestorClassLoader != null) { - if (ancestorClassLoader == registeredClassLoader) { - loadedFromAncestor = true; - break; - } - ancestorClassLoader = ancestorClassLoader.getParent(); + // if the type wasn't loaded from an ancestor, and the type isn't a processor, cs, or reporting task, then + // fail registration because we don't support multiple versions of any other types + if (!multipleVersionsAllowed(type)) { + throw new IllegalStateException("Attempt was made to load " + className + " from " + + bundle.getBundleDetails().getCoordinate().getCoordinate() + + " but that class name is already loaded/registered from " + registeredBundle.getBundleDetails().getCoordinate() + + " and multiple versions are not supported for this type" + ); } + } - // if this class was loaded from a non ancestor class loader, report potential unexpected behavior - if (!loadedFromAncestor) { - logger.warn("Attempt was made to load " + className + " from " + classLoader - + " but that class name is already loaded/registered from " + registeredClassLoader - + ". This may cause unpredictable behavior. Order of NARs is not guaranteed."); + // if none of the above was true then register the new bundle + if (!alreadyRegistered) { + registeredBundles.add(bundle); + classes.add(type); + + if (type.isAnnotationPresent(RequiresInstanceClassLoading.class)) { + final String cacheKey = getClassBundleKey(className, bundle.getBundleDetails().getCoordinate()); + requiresInstanceClassLoading.put(cacheKey, type); } } + } /** - * Determines the effective classloader for classes of the given type. If returns null it indicates the given type is not known or was not detected. - * - * @param classType to lookup the classloader of - * @return String of fully qualified class name; null if not a detected type + * @param type a Class that we found from a service loader + * @return true if the given class is a processor, controller service, or reporting task */ - public static ClassLoader getClassLoader(final String classType) { - return extensionClassloaderLookup.get(classType); + private static boolean multipleVersionsAllowed(Class<?> type) { + return Processor.class.isAssignableFrom(type) || ControllerService.class.isAssignableFrom(type) || ReportingTask.class.isAssignableFrom(type); } /** * Determines the effective ClassLoader for the instance of the given type. * - * @param classType the type of class to lookup the ClassLoader for + * @param classType the type of class to lookup the ClassLoader for * @param instanceIdentifier the identifier of the specific instance of the classType to look up the ClassLoader for + * @param bundle the bundle where the classType exists + * @param additionalUrls additional URLs to add to the instance class loader * @return the ClassLoader for the given instance of the given type, or null if the type is not a detected extension type */ - public static ClassLoader getClassLoader(final String classType, final String instanceIdentifier) { - if (StringUtils.isEmpty(classType) || StringUtils.isEmpty(instanceIdentifier)) { - throw new IllegalArgumentException("Class Type and Instance Identifier must be provided"); + public static InstanceClassLoader createInstanceClassLoader(final String classType, final String instanceIdentifier, final Bundle bundle, final Set<URL> additionalUrls) { + if (StringUtils.isEmpty(classType)) { + throw new IllegalArgumentException("Class-Type is required"); } - // Check if we already have a ClassLoader for this instance - ClassLoader instanceClassLoader = instanceClassloaderLookup.get(instanceIdentifier); + if (StringUtils.isEmpty(instanceIdentifier)) { + throw new IllegalArgumentException("Instance Identifier is required"); + } + + if (bundle == null) { + throw new IllegalArgumentException("Bundle is required"); + } - // If we don't then we'll create a new ClassLoader for this instance and add it to the map for future lookups - if (instanceClassLoader == null) { - final ClassLoader registeredClassLoader = getClassLoader(classType); - if (registeredClassLoader == null) { - return null; + // If the class is annotated with @RequiresInstanceClassLoading and the registered ClassLoader is a URLClassLoader + // then make a new InstanceClassLoader that is a full copy of the NAR Class Loader, otherwise create an empty + // InstanceClassLoader that has the NAR ClassLoader as a parent + + InstanceClassLoader instanceClassLoader; + final ClassLoader bundleClassLoader = bundle.getClassLoader(); + final String key = getClassBundleKey(classType, bundle.getBundleDetails().getCoordinate()); + + if (requiresInstanceClassLoading.containsKey(key) && bundleClassLoader instanceof NarClassLoader) { + final Class<?> type = requiresInstanceClassLoading.get(key); + final RequiresInstanceClassLoading requiresInstanceClassLoading = type.getAnnotation(RequiresInstanceClassLoading.class); + + final NarClassLoader narBundleClassLoader = (NarClassLoader) bundleClassLoader; + logger.debug("Including ClassLoader resources from {} for component {}", new Object[]{bundle.getBundleDetails(), instanceIdentifier}); + + final Set<URL> instanceUrls = new LinkedHashSet<>(); + for (final URL url : narBundleClassLoader.getURLs()) { + instanceUrls.add(url); } - // If the class is annotated with @RequiresInstanceClassLoading and the registered ClassLoader is a URLClassLoader - // then make a new InstanceClassLoader that is a full copy of the NAR Class Loader, otherwise create an empty - // InstanceClassLoader that has the NAR ClassLoader as a parent - if (requiresInstanceClassLoading.contains(classType) && (registeredClassLoader instanceof URLClassLoader)) { - final URLClassLoader registeredUrlClassLoader = (URLClassLoader) registeredClassLoader; - instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, registeredUrlClassLoader.getURLs(), registeredUrlClassLoader.getParent()); - } else { - instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, new URL[0], registeredClassLoader); + ClassLoader ancestorClassLoader = narBundleClassLoader.getParent(); + + if (requiresInstanceClassLoading.cloneAncestorResources()) { + final ConfigurableComponent component = getTempComponent(classType, bundle.getBundleDetails().getCoordinate()); + final Set<BundleCoordinate> reachableApiBundles = findReachableApiBundles(component); + + while (ancestorClassLoader != null && ancestorClassLoader instanceof NarClassLoader) { + final Bundle ancestorNarBundle = classLoaderBundleLookup.get(ancestorClassLoader); + + // stop including ancestor resources when we reach one of the APIs + if (ancestorNarBundle == null || reachableApiBundles.contains(ancestorNarBundle.getBundleDetails().getCoordinate())) { + break; + } + + final NarClassLoader ancestorNarClassLoader = (NarClassLoader) ancestorClassLoader; + for (final URL url : ancestorNarClassLoader.getURLs()) { + instanceUrls.add(url); + } + ancestorClassLoader = ancestorNarClassLoader.getParent(); + } } - instanceClassloaderLookup.put(instanceIdentifier, instanceClassLoader); + instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, instanceUrls, additionalUrls, ancestorClassLoader); + } else { + instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, Collections.emptySet(), additionalUrls, bundleClassLoader); + } + + if (logger.isTraceEnabled()) { + for (URL url : instanceClassLoader.getURLs()) { + logger.trace("URL resource {} for {}...", new Object[]{url.toExternalForm(), instanceIdentifier}); + } } + instanceClassloaderLookup.put(instanceIdentifier, instanceClassLoader); return instanceClassLoader; } /** - * Removes the ClassLoader for the given instance and closes it if necessary. + * Find the bundle coordinates for any service APIs that are referenced by this component and not part of the same bundle. * - * @param instanceIdentifier the identifier of a component to remove the ClassLoader for - * @return the removed ClassLoader for the given instance, or null if not found + * @param component the component being instantiated */ - public static ClassLoader removeInstanceClassLoaderIfExists(final String instanceIdentifier) { + protected static Set<BundleCoordinate> findReachableApiBundles(final ConfigurableComponent component) { + final Set<BundleCoordinate> reachableApiBundles = new HashSet<>(); + + try (final NarCloseable closeable = NarCloseable.withComponentNarLoader(component.getClass().getClassLoader())) { + final List<PropertyDescriptor> descriptors = component.getPropertyDescriptors(); + if (descriptors != null && !descriptors.isEmpty()) { + for (final PropertyDescriptor descriptor : descriptors) { + final Class<? extends ControllerService> serviceApi = descriptor.getControllerServiceDefinition(); + if (serviceApi != null && !component.getClass().getClassLoader().equals(serviceApi.getClassLoader())) { + final Bundle apiBundle = classLoaderBundleLookup.get(serviceApi.getClassLoader()); + reachableApiBundles.add(apiBundle.getBundleDetails().getCoordinate()); + } + } + } + } + + return reachableApiBundles; + } + + /** + * Retrieves the InstanceClassLoader for the component with the given identifier. + * + * @param instanceIdentifier the identifier of a component + * @return the instance class loader for the component + */ + public static InstanceClassLoader getInstanceClassLoader(final String instanceIdentifier) { + return instanceClassloaderLookup.get(instanceIdentifier); + } + + /** + * Removes the InstanceClassLoader for a given component. + * + * @param instanceIdentifier the of a component + */ + public static InstanceClassLoader removeInstanceClassLoader(final String instanceIdentifier) { if (instanceIdentifier == null) { return null; } - final ClassLoader classLoader = instanceClassloaderLookup.remove(instanceIdentifier); + final InstanceClassLoader classLoader = instanceClassloaderLookup.remove(instanceIdentifier); + closeURLClassLoader(instanceIdentifier, classLoader); + return classLoader; + } + + /** + * Closes the given ClassLoader if it is an instance of URLClassLoader. + * + * @param instanceIdentifier the instance id the class loader corresponds to + * @param classLoader the class loader to close + */ + public static void closeURLClassLoader(final String instanceIdentifier, final ClassLoader classLoader) { if (classLoader != null && (classLoader instanceof URLClassLoader)) { final URLClassLoader urlClassLoader = (URLClassLoader) classLoader; try { urlClassLoader.close(); } catch (IOException e) { - logger.warn("Unable to class URLClassLoader for " + instanceIdentifier); + logger.warn("Unable to close URLClassLoader for " + instanceIdentifier); } } - return classLoader; } /** - * Checks if the given class type requires per-instance class loading (i.e. contains the @RequiresInstanceClassLoading annotation) + * Retrieves the bundles that have a class with the given name. + * + * @param classType the class name of an extension + * @return the list of bundles that contain an extension with the given class name + */ + public static List<Bundle> getBundles(final String classType) { + if (classType == null) { + throw new IllegalArgumentException("Class type cannot be null"); + } + final List<Bundle> bundles = classNameBundleLookup.get(classType); + return bundles == null ? Collections.emptyList() : new ArrayList<>(bundles); + } + + /** + * Retrieves the bundle with the given coordinate. + * + * @param bundleCoordinate a coordinate to look up + * @return the bundle with the given coordinate, or null if none exists + */ + public static Bundle getBundle(final BundleCoordinate bundleCoordinate) { + if (bundleCoordinate == null) { + throw new IllegalArgumentException("BundleCoordinate cannot be null"); + } + return bundleCoordinateBundleLookup.get(bundleCoordinate); + } + + /** + * Retrieves the bundle for the given class loader. * - * @param classType the class to check - * @return true if the class is found in the set of classes requiring instance level class loading, false otherwise + * @param classLoader the class loader to look up the bundle for + * @return the bundle for the given class loader */ - public static boolean requiresInstanceClassLoading(final String classType) { - return requiresInstanceClassLoading.contains(classType); + public static Bundle getBundle(final ClassLoader classLoader) { + if (classLoader == null) { + throw new IllegalArgumentException("ClassLoader cannot be null"); + } + return classLoaderBundleLookup.get(classLoader); } public static Set<Class> getExtensions(final Class<?> definition) { + if (definition == null) { + throw new IllegalArgumentException("Class cannot be null"); + } final Set<Class> extensions = definitionMap.get(definition); return (extensions == null) ? Collections.<Class>emptySet() : extensions; } + public static ConfigurableComponent getTempComponent(final String classType, final BundleCoordinate bundleCoordinate) { + if (classType == null) { + throw new IllegalArgumentException("Class type cannot be null"); + } + + if (bundleCoordinate == null) { + throw new IllegalArgumentException("Bundle Coordinate cannot be null"); + } + + return tempComponentLookup.get(getClassBundleKey(classType, bundleCoordinate)); + } + + private static String getClassBundleKey(final String classType, final BundleCoordinate bundleCoordinate) { + return classType + "_" + bundleCoordinate.getCoordinate(); + } + public static void logClassLoaderMapping() { final StringBuilder builder = new StringBuilder(); - builder.append("Extension Type Mapping to Classloader:"); + builder.append("Extension Type Mapping to Bundle:"); for (final Map.Entry<Class, Set<Class>> entry : definitionMap.entrySet()) { - builder.append("\n\t=== ").append(entry.getKey().getSimpleName()).append(" type || Classloader ==="); + builder.append("\n\t=== ").append(entry.getKey().getSimpleName()).append(" Type ==="); for (final Class type : entry.getValue()) { - builder.append("\n\t").append(type.getName()).append(" || ").append(getClassLoader(type.getName())); + final List<Bundle> bundles = classNameBundleLookup.containsKey(type.getName()) + ? classNameBundleLookup.get(type.getName()) : Collections.emptyList(); + + builder.append("\n\t").append(type.getName()); + + for (final Bundle bundle : bundles) { + final String coordinate = bundle.getBundleDetails().getCoordinate().getCoordinate(); + final String workingDir = bundle.getBundleDetails().getWorkingDirectory().getPath(); + builder.append("\n\t\t").append(coordinate).append(" || ").append(workingDir); + } } builder.append("\n\t=== End ").append(entry.getKey().getSimpleName()).append(" types ===");
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b5561877/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java index 8aff08f..d9e23fa 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java @@ -19,16 +19,17 @@ package org.apache.nifi.nar; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; /** - * A ClassLoader created for an instance of a component which lets a client add resources to an intermediary ClassLoader - * that will be checked first when loading/finding classes. + * Each processor, controller service, and reporting task will have an InstanceClassLoader. * - * Typically an instance of this ClassLoader will be created by passing in the URLs and parent from a NARClassLoader in - * order to create a copy of the NARClassLoader without modifying it. + * The InstanceClassLoader will either be an empty pass-through to the NARClassLoader, or will contain a + * copy of all the NAR's resources in the case of components that @RequireInstanceClassLoading. */ public class InstanceClassLoader extends URLClassLoader { @@ -36,125 +37,53 @@ public class InstanceClassLoader extends URLClassLoader { private final String identifier; private final String instanceType; - private ShimClassLoader shimClassLoader; + + private final Set<URL> instanceUrls; + private final Set<URL> additionalResourceUrls; /** * @param identifier the id of the component this ClassLoader was created for - * @param urls the URLs for the ClassLoader + * @param instanceUrls the urls for the instance, will either be empty or a copy of the NARs urls + * @param additionalResourceUrls the urls that came from runtime properties of the component * @param parent the parent ClassLoader */ - public InstanceClassLoader(final String identifier, final String type, final URL[] urls, final ClassLoader parent) { - super(urls, parent); + public InstanceClassLoader(final String identifier, final String type, final Set<URL> instanceUrls, final Set<URL> additionalResourceUrls, final ClassLoader parent) { + super(combineURLs(instanceUrls, additionalResourceUrls), parent); this.identifier = identifier; this.instanceType = type; + this.instanceUrls = Collections.unmodifiableSet( + instanceUrls == null ? Collections.emptySet() : new LinkedHashSet<>(instanceUrls)); + this.additionalResourceUrls = Collections.unmodifiableSet( + additionalResourceUrls == null ? Collections.emptySet() : new LinkedHashSet<>(additionalResourceUrls)); } - /** - * Initializes a new ShimClassLoader for the provided resources, closing the previous ShimClassLoader if one existed. - * - * @param urls the URLs for the ShimClassLoader - * @throws IOException if the previous ShimClassLoader existed and couldn't be closed - */ - public synchronized void setInstanceResources(final URL[] urls) { - if (shimClassLoader != null) { - try { - shimClassLoader.close(); - } catch (IOException e) { - logger.warn("Unable to close inner URLClassLoader for " + identifier); - } - } + private static URL[] combineURLs(final Set<URL> instanceUrls, final Set<URL> additionalResourceUrls) { + final Set<URL> allUrls = new LinkedHashSet<>(); - shimClassLoader = new ShimClassLoader(urls, getParent()); - } + if (instanceUrls != null) { + allUrls.addAll(instanceUrls); + } - /** - * @return the URLs for the instance resources that have been set - */ - public synchronized URL[] getInstanceResources() { - if (shimClassLoader != null) { - return shimClassLoader.getURLs(); + if (additionalResourceUrls != null) { + allUrls.addAll(additionalResourceUrls); } - return new URL[0]; - } - @Override - public Class<?> loadClass(String name) throws ClassNotFoundException { - return this.loadClass(name, false); + return allUrls.toArray(new URL[allUrls.size()]); } - @Override - protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { - Class<?> c = null; - // first try the shim - if (shimClassLoader != null) { - try { - c = shimClassLoader.loadClass(name, resolve); - } catch (ClassNotFoundException e) { - c = null; - } - } - // if it wasn't in the shim try our self - if (c == null) { - return super.loadClass(name, resolve); - } else { - return c; - } + public String getIdentifier() { + return identifier; } - @Override - protected Class<?> findClass(String name) throws ClassNotFoundException { - Class<?> c = null; - // first try the shim - if (shimClassLoader != null) { - try { - c = shimClassLoader.findClass(name); - } catch (ClassNotFoundException cnf) { - c = null; - } - } - // if it wasn't in the shim try our self - if (c == null) { - return super.findClass(name); - } else { - return c; - } + public String getInstanceType() { + return instanceType; } - @Override - public void close() throws IOException { - if (shimClassLoader != null) { - try { - shimClassLoader.close(); - } catch (IOException e) { - logger.warn("Unable to close inner URLClassLoader for " + identifier); - } - } - super.close(); + public Set<URL> getInstanceUrls() { + return instanceUrls; } - /** - * Extend URLClassLoader to increase visibility of protected methods so that InstanceClassLoader can delegate. - */ - private static class ShimClassLoader extends URLClassLoader { - - public ShimClassLoader(URL[] urls, ClassLoader parent) { - super(urls, parent); - } - - public ShimClassLoader(URL[] urls) { - super(urls); - } - - @Override - public Class<?> findClass(String name) throws ClassNotFoundException { - return super.findClass(name); - } - - @Override - public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { - return super.loadClass(name, resolve); - } - + public Set<URL> getAdditionalResourceUrls() { + return additionalResourceUrls; } - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b5561877/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarBundleUtil.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarBundleUtil.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarBundleUtil.java new file mode 100644 index 0000000..62afb37 --- /dev/null +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarBundleUtil.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.bundle.BundleDetails; +import org.apache.nifi.util.StringUtils; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.jar.Attributes; +import java.util.jar.Manifest; + +public class NarBundleUtil { + + /** + * Creates a BundleDetails from the given NAR working directory. + * + * @param narDirectory the directory of an exploded NAR which contains a META-INF/MANIFEST.MF + * + * @return the BundleDetails constructed from the information in META-INF/MANIFEST.MF + */ + public static BundleDetails fromNarDirectory(final File narDirectory) throws IOException, IllegalStateException { + if (narDirectory == null) { + throw new IllegalArgumentException("NAR Directory cannot be null"); + } + + final File manifestFile = new File(narDirectory, "META-INF/MANIFEST.MF"); + try (final FileInputStream fis = new FileInputStream(manifestFile)) { + final Manifest manifest = new Manifest(fis); + final Attributes attributes = manifest.getMainAttributes(); + + final BundleDetails.Builder builder = new BundleDetails.Builder(); + builder.workingDir(narDirectory); + + final String group = attributes.getValue(NarManifestEntry.NAR_GROUP.getManifestName()); + final String id = attributes.getValue(NarManifestEntry.NAR_ID.getManifestName()); + final String version = attributes.getValue(NarManifestEntry.NAR_VERSION.getManifestName()); + builder.coordinate(new BundleCoordinate(group, id, version)); + + final String dependencyGroup = attributes.getValue(NarManifestEntry.NAR_DEPENDENCY_GROUP.getManifestName()); + final String dependencyId = attributes.getValue(NarManifestEntry.NAR_DEPENDENCY_ID.getManifestName()); + final String dependencyVersion = attributes.getValue(NarManifestEntry.NAR_DEPENDENCY_VERSION.getManifestName()); + if (!StringUtils.isBlank(dependencyId)) { + builder.dependencyCoordinate(new BundleCoordinate(dependencyGroup, dependencyId, dependencyVersion)); + } + + builder.buildBranch(attributes.getValue(NarManifestEntry.BUILD_BRANCH.getManifestName())); + builder.buildTag(attributes.getValue(NarManifestEntry.BUILD_TAG.getManifestName())); + builder.buildRevision(attributes.getValue(NarManifestEntry.BUILD_REVISION.getManifestName())); + builder.buildTimestamp(attributes.getValue(NarManifestEntry.BUILD_TIMESTAMP.getManifestName())); + builder.buildJdk(attributes.getValue(NarManifestEntry.BUILD_JDK.getManifestName())); + builder.builtBy(attributes.getValue(NarManifestEntry.BUILT_BY.getManifestName())); + + return builder.build(); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b5561877/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java index 9219721..7e8ba89 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java @@ -16,8 +16,14 @@ */ package org.apache.nifi.nar; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.bundle.BundleDetails; +import org.apache.nifi.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -28,14 +34,10 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.jar.Attributes; -import java.util.jar.Manifest; -import org.apache.nifi.util.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + /** - * + * A singleton class used to initialize the extension and framework classloaders. */ public final class NarClassLoaders { @@ -49,18 +51,18 @@ public final class NarClassLoaders { private final File frameworkWorkingDir; private final File extensionWorkingDir; - private final ClassLoader frameworkClassLoader; - private final Map<String, ClassLoader> extensionClassLoaders; + private final Bundle frameworkBundle; + private final Map<String, Bundle> bundles; private InitContext( final File frameworkDir, final File extensionDir, - final ClassLoader frameworkClassloader, - final Map<String, ClassLoader> extensionClassLoaders) { + final Bundle frameworkBundle, + final Map<String, Bundle> bundles) { this.frameworkWorkingDir = frameworkDir; this.extensionWorkingDir = extensionDir; - this.frameworkClassLoader = frameworkClassloader; - this.extensionClassLoaders = extensionClassLoaders; + this.frameworkBundle = frameworkBundle; + this.bundles = bundles; } } @@ -127,12 +129,13 @@ public final class NarClassLoaders { ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader(); // find all nar files and create class loaders for them. - final Map<String, ClassLoader> extensionDirectoryClassLoaderLookup = new LinkedHashMap<>(); - final Map<String, ClassLoader> narIdClassLoaderLookup = new HashMap<>(); + final Map<String, Bundle> narDirectoryBundleLookup = new LinkedHashMap<>(); + final Map<String, ClassLoader> narCoordinateClassLoaderLookup = new HashMap<>(); + final Map<String, Set<BundleCoordinate>> narIdBundleLookup = new HashMap<>(); // make sure the nar directory is there and accessible - FileUtils.ensureDirectoryExistAndCanAccess(frameworkWorkingDir); - FileUtils.ensureDirectoryExistAndCanAccess(extensionsWorkingDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(frameworkWorkingDir); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(extensionsWorkingDir); final List<File> narWorkingDirContents = new ArrayList<>(); final File[] frameworkWorkingDirContents = frameworkWorkingDir.listFiles(); @@ -145,20 +148,30 @@ public final class NarClassLoaders { } if (!narWorkingDirContents.isEmpty()) { - final List<NarDetails> narDetails = new ArrayList<>(); + final List<BundleDetails> narDetails = new ArrayList<>(); + final Map<String,String> narCoordinatesToWorkingDir = new HashMap<>(); // load the nar details which includes and nar dependencies for (final File unpackedNar : narWorkingDirContents) { - final NarDetails narDetail = getNarDetails(unpackedNar); + BundleDetails narDetail = null; + try { + narDetail = getNarDetails(unpackedNar); + } catch (IllegalStateException e) { + logger.warn("Unable to load NAR {} due to {}, skipping...", + new Object[] {unpackedNar.getAbsolutePath(), e.getMessage()}); + } - // ensure the nar contained an identifier - if (narDetail.getNarId() == null) { - logger.warn("No NAR Id found. Skipping: " + unpackedNar.getAbsolutePath()); - continue; + // prevent the application from starting when there are two NARs with same group, id, and version + final String narCoordinate = narDetail.getCoordinate().getCoordinate(); + if (narCoordinatesToWorkingDir.containsKey(narCoordinate)) { + final String existingNarWorkingDir = narCoordinatesToWorkingDir.get(narCoordinate); + throw new IllegalStateException("Unable to load NAR with coordinates " + narCoordinate + + " and working directory " + narDetail.getWorkingDirectory() + + " because another NAR with the same coordinates already exists at " + existingNarWorkingDir); } - // store the nar details narDetails.add(narDetail); + narCoordinatesToWorkingDir.put(narCoordinate, narDetail.getWorkingDirectory().getCanonicalPath()); } int narCount; @@ -167,22 +180,50 @@ public final class NarClassLoaders { narCount = narDetails.size(); // attempt to create each nar class loader - for (final Iterator<NarDetails> narDetailsIter = narDetails.iterator(); narDetailsIter.hasNext();) { - final NarDetails narDetail = narDetailsIter.next(); - final String narDependencies = narDetail.getNarDependencyId(); + for (final Iterator<BundleDetails> narDetailsIter = narDetails.iterator(); narDetailsIter.hasNext();) { + final BundleDetails narDetail = narDetailsIter.next(); + final BundleCoordinate narDependencyCoordinate = narDetail.getDependencyCoordinate(); // see if this class loader is eligible for loading ClassLoader narClassLoader = null; - if (narDependencies == null) { - narClassLoader = createNarClassLoader(narDetail.getNarWorkingDirectory(), currentContextClassLoader); - } else if (narIdClassLoaderLookup.containsKey(narDetail.getNarDependencyId())) { - narClassLoader = createNarClassLoader(narDetail.getNarWorkingDirectory(), narIdClassLoaderLookup.get(narDetail.getNarDependencyId())); + if (narDependencyCoordinate == null) { + narClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), currentContextClassLoader); + } else { + final String dependencyCoordinateStr = narDependencyCoordinate.getCoordinate(); + + // if the declared dependency has already been loaded + if (narCoordinateClassLoaderLookup.containsKey(dependencyCoordinateStr)) { + final ClassLoader narDependencyClassLoader = narCoordinateClassLoaderLookup.get(dependencyCoordinateStr); + narClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), narDependencyClassLoader); + } else { + // get all bundles that match the declared dependency id + final Set<BundleCoordinate> coordinates = narIdBundleLookup.get(narDependencyCoordinate.getId()); + + // ensure there are known bundles that match the declared dependency id + if (coordinates != null && !coordinates.contains(narDependencyCoordinate)) { + // ensure the declared dependency only has one possible bundle + if (coordinates.size() == 1) { + // get the bundle with the matching id + final BundleCoordinate coordinate = coordinates.stream().findFirst().get(); + + // if that bundle is loaded, use it + if (narCoordinateClassLoaderLookup.containsKey(coordinate.getCoordinate())) { + logger.warn(String.format("While loading '%s' unable to locate exact NAR dependency '%s'. Only found one possible match '%s'. Continuing...", + narDetail.getCoordinate().getCoordinate(), dependencyCoordinateStr, coordinate.getCoordinate())); + + final ClassLoader narDependencyClassLoader = narCoordinateClassLoaderLookup.get(coordinate.getCoordinate()); + narClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), narDependencyClassLoader); + } + } + } + } } // if we were able to create the nar class loader, store it and remove the details - if (narClassLoader != null) { - extensionDirectoryClassLoaderLookup.put(narDetail.getNarWorkingDirectory().getCanonicalPath(), narClassLoader); - narIdClassLoaderLookup.put(narDetail.getNarId(), narClassLoader); + final ClassLoader bundleClassLoader = narClassLoader; + if (bundleClassLoader != null) { + narDirectoryBundleLookup.put(narDetail.getWorkingDirectory().getCanonicalPath(), new Bundle(narDetail, bundleClassLoader)); + narCoordinateClassLoaderLookup.put(narDetail.getCoordinate().getCoordinate(), narClassLoader); narDetailsIter.remove(); } } @@ -191,12 +232,18 @@ public final class NarClassLoaders { } while (narCount != narDetails.size()); // see if any nars couldn't be loaded - for (final NarDetails narDetail : narDetails) { - logger.warn(String.format("Unable to resolve required dependency '%s'. Skipping NAR %s", narDetail.getNarDependencyId(), narDetail.getNarWorkingDirectory().getAbsolutePath())); + for (final BundleDetails narDetail : narDetails) { + logger.warn(String.format("Unable to resolve required dependency '%s'. Skipping NAR '%s'", + narDetail.getDependencyCoordinate().getId(), narDetail.getWorkingDirectory().getAbsolutePath())); } } - return new InitContext(frameworkWorkingDir, extensionsWorkingDir, narIdClassLoaderLookup.get(FRAMEWORK_NAR_ID), new LinkedHashMap<>(extensionDirectoryClassLoaderLookup)); + // find the framework bundle, NarUnpacker already checked that there was a framework NAR and that there was only one + final Bundle frameworkBundle = narDirectoryBundleLookup.values().stream() + .filter(b -> b.getBundleDetails().getCoordinate().getId().equals(FRAMEWORK_NAR_ID)) + .findFirst().orElse(null); + + return new InitContext(frameworkWorkingDir, extensionsWorkingDir, frameworkBundle, new LinkedHashMap<>(narDirectoryBundleLookup)); } /** @@ -223,50 +270,36 @@ public final class NarClassLoaders { * @return details about the NAR * @throws IOException ioe */ - private static NarDetails getNarDetails(final File narDirectory) throws IOException { - final NarDetails narDetails = new NarDetails(); - narDetails.setNarWorkingDirectory(narDirectory); - - final File manifestFile = new File(narDirectory, "META-INF/MANIFEST.MF"); - try (final FileInputStream fis = new FileInputStream(manifestFile)) { - final Manifest manifest = new Manifest(fis); - final Attributes attributes = manifest.getMainAttributes(); - - // get the nar details - narDetails.setNarId(attributes.getValue("Nar-Id")); - narDetails.setNarDependencyId(attributes.getValue("Nar-Dependency-Id")); - } - - return narDetails; + private static BundleDetails getNarDetails(final File narDirectory) throws IOException { + return NarBundleUtil.fromNarDirectory(narDirectory); } /** - * @return the framework class loader + * @return the framework class Bundle * - * @throws IllegalStateException if the frame class loader has not been - * loaded + * @throws IllegalStateException if the frame Bundle has not been loaded */ - public ClassLoader getFrameworkClassLoader() { + public Bundle getFrameworkBundle() { if (initContext == null) { - throw new IllegalStateException("Framework class loader has not been loaded."); + throw new IllegalStateException("Framework bundle has not been loaded."); } - return initContext.frameworkClassLoader; + return initContext.frameworkBundle; } /** * @param extensionWorkingDirectory the directory - * @return the class loader for the specified working directory. Returns - * null when no class loader exists for the specified working directory - * @throws IllegalStateException if the class loaders have not been loaded + * @return the bundle for the specified working directory. Returns + * null when no bundle exists for the specified working directory + * @throws IllegalStateException if the bundles have not been loaded */ - public ClassLoader getExtensionClassLoader(final File extensionWorkingDirectory) { + public Bundle getBundle(final File extensionWorkingDirectory) { if (initContext == null) { throw new IllegalStateException("Extensions class loaders have not been loaded."); } try { - return initContext.extensionClassLoaders.get(extensionWorkingDirectory.getCanonicalPath()); + return initContext.bundles.get(extensionWorkingDirectory.getCanonicalPath()); } catch (final IOException ioe) { if(logger.isDebugEnabled()){ logger.debug("Unable to get extension classloader for working directory '{}'", extensionWorkingDirectory); @@ -276,45 +309,15 @@ public final class NarClassLoaders { } /** - * @return the extension class loaders - * @throws IllegalStateException if the class loaders have not been loaded + * @return the extensions that have been loaded + * @throws IllegalStateException if the extensions have not been loaded */ - public Set<ClassLoader> getExtensionClassLoaders() { + public Set<Bundle> getBundles() { if (initContext == null) { - throw new IllegalStateException("Extensions class loaders have not been loaded."); + throw new IllegalStateException("Bundles have not been loaded."); } - return new LinkedHashSet<>(initContext.extensionClassLoaders.values()); + return new LinkedHashSet<>(initContext.bundles.values()); } - private static class NarDetails { - - private String narId; - private String narDependencyId; - private File narWorkingDirectory; - - public String getNarDependencyId() { - return narDependencyId; - } - - public void setNarDependencyId(String narDependencyId) { - this.narDependencyId = narDependencyId; - } - - public String getNarId() { - return narId; - } - - public void setNarId(String narId) { - this.narId = narId; - } - - public File getNarWorkingDirectory() { - return narWorkingDirectory; - } - - public void setNarWorkingDirectory(File narWorkingDirectory) { - this.narWorkingDirectory = narWorkingDirectory; - } - } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b5561877/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java index 56aff9e..098b7d3 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java @@ -16,11 +16,11 @@ */ package org.apache.nifi.nar; +import java.io.Closeable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; - /** * */ @@ -48,7 +48,7 @@ public class NarCloseable implements Closeable { public static NarCloseable withComponentNarLoader(final Class componentClass, final String componentIdentifier) { final ClassLoader current = Thread.currentThread().getContextClassLoader(); - ClassLoader componentClassLoader = ExtensionManager.getClassLoader(componentClass.getName(), componentIdentifier); + ClassLoader componentClassLoader = ExtensionManager.getInstanceClassLoader(componentIdentifier); if (componentClassLoader == null) { componentClassLoader = componentClass.getClassLoader(); } @@ -58,6 +58,20 @@ public class NarCloseable implements Closeable { } /** + * Sets the current thread context class loader to the provided class loader, and returns a NarCloseable that will + * return the current thread context class loader to it's previous state. + * + * @param componentNarLoader the class loader to set as the current thread context class loader + * + * @return NarCloseable that will return the current thread context class loader to its previous state + */ + public static NarCloseable withComponentNarLoader(final ClassLoader componentNarLoader) { + final ClassLoader current = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(componentNarLoader); + return new NarCloseable(current); + } + + /** * Creates a Closeable object that can be used to to switch to current class * loader to the framework class loader and will automatically set the * ClassLoader back to the previous class loader when closed @@ -67,7 +81,7 @@ public class NarCloseable implements Closeable { public static NarCloseable withFrameworkNar() { final ClassLoader frameworkClassLoader; try { - frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkClassLoader(); + frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkBundle().getClassLoader(); } catch (final Exception e) { // This should never happen in a running instance, but it will occur in unit tests logger.error("Unable to access Framework ClassLoader due to " + e + ". Will continue without changing ClassLoaders."); http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b5561877/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarManifestEntry.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarManifestEntry.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarManifestEntry.java new file mode 100644 index 0000000..8b02742 --- /dev/null +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarManifestEntry.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +/** + * Enumeration of entries that will be in a NAR MANIFEST file. + */ +public enum NarManifestEntry { + + NAR_GROUP("Nar-Group"), + NAR_ID("Nar-Id"), + NAR_VERSION("Nar-Version"), + NAR_DEPENDENCY_GROUP("Nar-Dependency-Group"), + NAR_DEPENDENCY_ID("Nar-Dependency-Id"), + NAR_DEPENDENCY_VERSION("Nar-Dependency-Version"), + BUILD_TAG("Build-Tag"), + BUILD_REVISION("Build-Revision"), + BUILD_BRANCH("Build-Branch"), + BUILD_TIMESTAMP("Build-Timestamp"), + BUILD_JDK("Build-Jdk"), + BUILT_BY("Built-By"), + ; + + final String manifestName; + + NarManifestEntry(String manifestName) { + this.manifestName = manifestName; + } + + public String getManifestName() { + return manifestName; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b5561877/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java index 6a66ba2..364d3a9 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java @@ -18,7 +18,9 @@ package org.apache.nifi.nar; import org.apache.nifi.authentication.LoginIdentityProvider; import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.bundle.Bundle; import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.StateProvider; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.FlowFileRepository; @@ -68,6 +70,7 @@ public class NarThreadContextClassLoader extends URLClassLoader { narSpecificClasses.add(FlowFileRepository.class); narSpecificClasses.add(FlowFileSwapManager.class); narSpecificClasses.add(ContentRepository.class); + narSpecificClasses.add(StateProvider.class); } private NarThreadContextClassLoader() { @@ -187,15 +190,17 @@ public class NarThreadContextClassLoader extends URLClassLoader { final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(NarThreadContextClassLoader.getInstance()); try { - final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(implementationClassName); - final Class<?> rawClass; - if (detectedClassLoaderForType == null) { - // try to find from the current class loader - rawClass = Class.forName(implementationClassName); - } else { - // try to find from the registered classloader for that type - rawClass = Class.forName(implementationClassName, true, ExtensionManager.getClassLoader(implementationClassName)); + final List<Bundle> bundles = ExtensionManager.getBundles(implementationClassName); + if (bundles.size() == 0) { + throw new IllegalStateException(String.format("The specified implementation class '%s' is not known to this nifi.", implementationClassName)); } + if (bundles.size() > 1) { + throw new IllegalStateException(String.format("More than one bundle was found for the specified implementation class '%s', only one is allowed.", implementationClassName)); + } + + final Bundle bundle = bundles.get(0); + final ClassLoader detectedClassLoaderForType = bundle.getClassLoader(); + final Class<?> rawClass = Class.forName(implementationClassName, true, detectedClassLoaderForType); Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); final Class<?> desiredClass = rawClass.asSubclass(typeDefinition); http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b5561877/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/SystemBundle.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/SystemBundle.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/SystemBundle.java new file mode 100644 index 0000000..0fb2bad --- /dev/null +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/SystemBundle.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.bundle.BundleDetails; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StringUtils; + +import java.io.File; + +/** + * Utility to create the system bundle. + */ +public final class SystemBundle { + + public static final BundleCoordinate SYSTEM_BUNDLE_COORDINATE = new BundleCoordinate( + BundleCoordinate.DEFAULT_GROUP, "system", BundleCoordinate.DEFAULT_VERSION); + + /** + * Returns a bundle representing the system class loader. + * + * @param niFiProperties a NiFiProperties instance which will be used to obtain the default NAR library path, + * which will become the working directory of the returned bundle + * @return a bundle for the system class loader + */ + public static Bundle create(final NiFiProperties niFiProperties) { + final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader(); + + final String narLibraryDirectory = niFiProperties.getProperty(NiFiProperties.NAR_LIBRARY_DIRECTORY); + if (StringUtils.isBlank(narLibraryDirectory)) { + throw new IllegalStateException("Unable to create system bundle because " + NiFiProperties.NAR_LIBRARY_DIRECTORY + " was null or empty"); + } + + final BundleDetails systemBundleDetails = new BundleDetails.Builder() + .workingDir(new File(narLibraryDirectory)) + .coordinate(SYSTEM_BUNDLE_COORDINATE) + .build(); + + return new Bundle(systemBundleDetails, systemClassLoader); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b5561877/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml index 555a25c..d0257a1 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml @@ -36,9 +36,9 @@ <!-- Provide a cap of 10 MB across all archive files --> <totalSizeCap>10MB</totalSizeCap> </rollingPolicy> + <immediateFlush>true</immediateFlush> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%date %level [%thread] %logger{40} %msg%n</pattern> - <immediateFlush>true</immediateFlush> </encoder> </appender> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b5561877/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java index 573b139..8ee1626 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java @@ -16,11 +16,23 @@ */ package org.apache.nifi.minifi; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.NarClassLoaders; +import org.apache.nifi.nar.NarUnpacker; +import org.apache.nifi.nar.SystemBundle; +import org.apache.nifi.util.FileUtils; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; + import java.io.File; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Executors; @@ -32,15 +44,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; // These are from the minifi-nar-utils -import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.nar.NarClassLoaders; -import org.apache.nifi.nar.NarUnpacker; -import org.apache.nifi.util.FileUtils; - -import org.apache.nifi.util.NiFiProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.bridge.SLF4JBridgeHandler; public class MiNiFi { @@ -111,13 +114,16 @@ public class MiNiFi { NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory()); // load the framework classloader - final ClassLoader frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkClassLoader(); + final ClassLoader frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkBundle().getClassLoader(); if (frameworkClassLoader == null) { throw new IllegalStateException("Unable to find the framework NAR ClassLoader."); } + final Bundle systemBundle = SystemBundle.create(properties); + final Set<Bundle> narBundles = NarClassLoaders.getInstance().getBundles(); + // discover the extensions - ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders()); + ExtensionManager.discoverExtensions(systemBundle, narBundles); ExtensionManager.logClassLoaderMapping(); // load the server from the framework classloader