Repository: nifi Updated Branches: refs/heads/master 3c694b641 -> fe59b3415
NIFI-3003 Upgrading hadoop.version to 2.7.3 and fixing TDE issue with PutHDFS, ensuring clean up of instance class loaders, and adding classpath resource property to all HDFS processors NIFI-3003 Addressing review feedback NIFI-3003 added minor notice updates This closes #1219 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fe59b341 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fe59b341 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fe59b341 Branch: refs/heads/master Commit: fe59b3415c0215a9a4eb01c32838c5dbb584c009 Parents: 3c694b6 Author: Bryan Bende <bbe...@apache.org> Authored: Wed Nov 9 16:42:27 2016 -0500 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Tue Nov 15 16:23:27 2016 -0500 ---------------------------------------------------------------------- nifi-assembly/NOTICE | 8 +- .../init/ControllerServiceInitializer.java | 3 + .../init/ProcessorInitializer.java | 3 + .../init/ReportingTaskingInitializer.java | 3 + .../nifi/fingerprint/FingerprintFactory.java | 7 + .../nifi/groups/StandardProcessGroup.java | 19 +- .../org/apache/nifi/nar/ExtensionManager.java | 10 +- .../apache/nifi/nar/InstanceClassLoader.java | 23 +- .../hadoop/AbstractHadoopProcessor.java | 135 ++++++--- .../apache/nifi/processors/hadoop/PutHDFS.java | 281 ++++++++++--------- .../src/main/resources/META-INF/NOTICE | 8 + .../src/main/resources/META-INF/NOTICE | 14 +- pom.xml | 8 +- 13 files changed, 319 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-assembly/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index fd340b0..e920ff6 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -914,18 +914,14 @@ The following binary components are provided under the Apache Software License v Java Native Access Copyright 2015 Java Native Access - (ASLv2) HTrace Core + (ASLv2) Apache HTrace Core The following NOTICE information applies: - In addition, this product includes software dependencies. See - the accompanying LICENSE.txt for a listing of dependencies - that are NOT Apache licensed (with pointers to their licensing) + Copyright 2016 The Apache Software Foundation Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin is a distributed tracing system that is Apache 2.0 Licensed. Copyright 2012 Twitter, Inc. - - (ASLv2) Groovy (org.codehaus.groovy:groovy:jar:2.4.5 - http://www.groovy-lang.org) The following NOTICE information applies: Groovy Language http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java index 90c1e24..8bef0d3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java @@ -26,6 +26,7 @@ import org.apache.nifi.documentation.mock.MockControllerServiceInitializationCon import org.apache.nifi.documentation.mock.MockComponentLogger; import org.apache.nifi.documentation.util.ReflectionUtils; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.reporting.InitializationException; @@ -53,6 +54,8 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia final ComponentLog logger = new MockComponentLogger(); final MockConfigurationContext context = new MockConfigurationContext(); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, controllerService, logger, context); + } finally { + ExtensionManager.removeInstanceClassLoaderIfExists(component.getIdentifier()); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java index ae28299..4d38641 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java @@ -24,6 +24,7 @@ import org.apache.nifi.documentation.mock.MockProcessorInitializationContext; import org.apache.nifi.documentation.mock.MockComponentLogger; import org.apache.nifi.documentation.util.ReflectionUtils; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.ProcessorInitializationContext; @@ -52,6 +53,8 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer { final ComponentLog logger = new MockComponentLogger(); final MockProcessContext context = new MockProcessContext(); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, processor, logger, context); + } finally { + ExtensionManager.removeInstanceClassLoaderIfExists(component.getIdentifier()); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java index 041ff3e..a47959b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java @@ -23,6 +23,7 @@ import org.apache.nifi.documentation.mock.MockConfigurationContext; import org.apache.nifi.documentation.mock.MockComponentLogger; import org.apache.nifi.documentation.mock.MockReportingInitializationContext; import org.apache.nifi.documentation.util.ReflectionUtils; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingInitializationContext; @@ -51,6 +52,8 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial final MockConfigurationContext context = new MockConfigurationContext(); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, reportingTask, new MockComponentLogger(), context); + } finally { + ExtensionManager.removeInstanceClassLoaderIfExists(component.getIdentifier()); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 0ab39a2..c4da7b6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -24,6 +24,7 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.serialization.FlowFromDOMFactory; import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.processor.Processor; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.util.DomUtils; @@ -489,6 +490,12 @@ public class FingerprintFactory { if (logger.isDebugEnabled()) { logger.warn("", e); } + } finally { + // The processor instance is only for fingerprinting so we can remove the InstanceClassLoader here + // since otherwise it will stick around in the map forever + if (processor != null) { + ExtensionManager.removeInstanceClassLoaderIfExists(processor.getIdentifier()); + } } // properties http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 312103e..2c9af28 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -707,6 +707,7 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public void removeProcessor(final ProcessorNode processor) { + boolean removed = false; final String id = requireNonNull(processor).getIdentifier(); writeLock.lock(); try { @@ -756,9 +757,16 @@ public final class StandardProcessGroup implements ProcessGroup { removeConnection(conn); } - ExtensionManager.removeInstanceClassLoaderIfExists(id); + removed = true; LOG.info("{} removed from flow", processor); + } finally { + if (removed) { + try { + ExtensionManager.removeInstanceClassLoaderIfExists(id); + } catch (Throwable t) { + } + } writeLock.unlock(); } } @@ -1850,6 +1858,7 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public void removeControllerService(final ControllerServiceNode service) { + boolean removed = false; writeLock.lock(); try { final ControllerServiceNode existing = controllerServices.get(requireNonNull(service).getIdentifier()); @@ -1880,8 +1889,16 @@ public final class StandardProcessGroup implements ProcessGroup { controllerServices.remove(service.getIdentifier()); flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier()); + removed = true; LOG.info("{} removed from {}", service, this); + } finally { + if (removed) { + try { + ExtensionManager.removeInstanceClassLoaderIfExists(service.getIdentifier()); + } catch (Throwable t) { + } + } writeLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java index f9cf9eb..745ed9c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java @@ -200,9 +200,9 @@ public class ExtensionManager { // InstanceClassLoader that has the NAR ClassLoader as a parent if (requiresInstanceClassLoading.contains(classType) && (registeredClassLoader instanceof URLClassLoader)) { final URLClassLoader registeredUrlClassLoader = (URLClassLoader) registeredClassLoader; - instanceClassLoader = new InstanceClassLoader(instanceIdentifier, registeredUrlClassLoader.getURLs(), registeredUrlClassLoader.getParent()); + instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, registeredUrlClassLoader.getURLs(), registeredUrlClassLoader.getParent()); } else { - instanceClassLoader = new InstanceClassLoader(instanceIdentifier, new URL[0], registeredClassLoader); + instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, new URL[0], registeredClassLoader); } instanceClassloaderLookup.put(instanceIdentifier, instanceClassLoader); @@ -218,7 +218,11 @@ public class ExtensionManager { * @return the removed ClassLoader for the given instance, or null if not found */ public static ClassLoader removeInstanceClassLoaderIfExists(final String instanceIdentifier) { - ClassLoader classLoader = instanceClassloaderLookup.remove(instanceIdentifier); + if (instanceIdentifier == null) { + return null; + } + + final ClassLoader classLoader = instanceClassloaderLookup.remove(instanceIdentifier); if (classLoader != null && (classLoader instanceof URLClassLoader)) { final URLClassLoader urlClassLoader = (URLClassLoader) classLoader; try { http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java index 42d273a..2a9c72d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java @@ -35,6 +35,7 @@ public class InstanceClassLoader extends URLClassLoader { private static final Logger logger = LoggerFactory.getLogger(InstanceClassLoader.class); private final String identifier; + private final String instanceType; private ShimClassLoader shimClassLoader; /** @@ -42,9 +43,10 @@ public class InstanceClassLoader extends URLClassLoader { * @param urls the URLs for the ClassLoader * @param parent the parent ClassLoader */ - public InstanceClassLoader(final String identifier, final URL[] urls, final ClassLoader parent) { + public InstanceClassLoader(final String identifier, final String type, final URL[] urls, final ClassLoader parent) { super(urls, parent); this.identifier = identifier; + this.instanceType = type; } /** @@ -58,12 +60,11 @@ public class InstanceClassLoader extends URLClassLoader { try { shimClassLoader.close(); } catch (IOException e) { - logger.warn("Unable to close URLClassLoader for " + identifier); + logger.warn("Unable to close inner URLClassLoader for " + identifier); } } - // don't set a parent here b/c otherwise it will create an infinite loop - shimClassLoader = new ShimClassLoader(urls, null); + shimClassLoader = new ShimClassLoader(urls, getParent()); } /** @@ -88,7 +89,7 @@ public class InstanceClassLoader extends URLClassLoader { if (shimClassLoader != null) { try { c = shimClassLoader.loadClass(name, resolve); - } catch (ClassNotFoundException cnf) { + } catch (ClassNotFoundException e) { c = null; } } @@ -119,6 +120,18 @@ public class InstanceClassLoader extends URLClassLoader { } } + @Override + public void close() throws IOException { + if (shimClassLoader != null) { + try { + shimClassLoader.close(); + } catch (IOException e) { + logger.warn("Unable to close inner URLClassLoader for " + identifier); + } + } + super.close(); + } + /** * Extend URLClassLoader to increase visibility of protected methods so that InstanceClassLoader can delegate. */ http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index ecb24aa..8d31a7a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -47,6 +47,7 @@ import javax.net.SocketFactory; import java.io.File; import java.io.IOException; +import java.lang.ref.WeakReference; import java.net.InetSocketAddress; import java.net.Socket; import java.net.URI; @@ -55,6 +56,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.WeakHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -121,6 +124,15 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder() + .name("Additional Classpath Resources") + .description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " + + "directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamicallyModifiesClasspath(true) + .build(); + private static final Object RESOURCES_LOCK = new Object(); private long kerberosReloginThreshold; @@ -148,6 +160,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { props.add(kerberosProperties.getKerberosPrincipal()); props.add(kerberosProperties.getKerberosKeytab()); props.add(KERBEROS_RELOGIN_PERIOD); + props.add(ADDITIONAL_CLASSPATH_RESOURCES); properties = Collections.unmodifiableList(props); } @@ -227,7 +240,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { private static Configuration getConfigurationFromResources(String configResources) throws IOException { boolean foundResources = false; - final Configuration config = new Configuration(); + final Configuration config = new ExtendedConfiguration(); if (null != configResources) { String[] resources = configResources.split(","); for (String resource : resources) { @@ -257,51 +270,41 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. */ HdfsResources resetHDFSResources(String configResources, ProcessContext context) throws IOException { - // org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates - // later to do I/O. We need this class loader to be the NarClassLoader instead of the magical - // NarThreadContextClassLoader. - ClassLoader savedClassLoader = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - - try { - Configuration config = getConfigurationFromResources(configResources); - - // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout - checkHdfsUriForTimeout(config); - - // disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete - // restart - String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme()); - config.set(disableCacheName, "true"); - - // If kerberos is enabled, create the file system as the kerberos principal - // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time - FileSystem fs; - UserGroupInformation ugi; - synchronized (RESOURCES_LOCK) { - if (SecurityUtil.isSecurityEnabled(config)) { - String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); - String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); - ugi = SecurityUtil.loginKerberos(config, principal, keyTab); - fs = getFileSystemAsUser(config, ugi); - lastKerberosReloginTime = System.currentTimeMillis() / 1000; - } else { - config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); - config.set("hadoop.security.authentication", "simple"); - ugi = SecurityUtil.loginSimple(config); - fs = getFileSystemAsUser(config, ugi); - } + Configuration config = getConfigurationFromResources(configResources); + config.setClassLoader(Thread.currentThread().getContextClassLoader()); // set the InstanceClassLoader + + // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout + checkHdfsUriForTimeout(config); + + // disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete + // restart + String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme()); + config.set(disableCacheName, "true"); + + // If kerberos is enabled, create the file system as the kerberos principal + // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time + FileSystem fs; + UserGroupInformation ugi; + synchronized (RESOURCES_LOCK) { + if (SecurityUtil.isSecurityEnabled(config)) { + String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + ugi = SecurityUtil.loginKerberos(config, principal, keyTab); + fs = getFileSystemAsUser(config, ugi); + lastKerberosReloginTime = System.currentTimeMillis() / 1000; + } else { + config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); + config.set("hadoop.security.authentication", "simple"); + ugi = SecurityUtil.loginSimple(config); + fs = getFileSystemAsUser(config, ugi); } + } - final Path workingDir = fs.getWorkingDirectory(); - getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", - new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()}); - - return new HdfsResources(config, fs, ugi); + final Path workingDir = fs.getWorkingDirectory(); + getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", + new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()}); - } finally { - Thread.currentThread().setContextClassLoader(savedClassLoader); - } + return new HdfsResources(config, fs, ugi); } /** @@ -510,4 +513,50 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } } + /** + * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be + * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load + * something that was previously not found, but might now be available. + * + * Reference the original getClassByNameOrNull from Configuration. + */ + static class ExtendedConfiguration extends Configuration { + + private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> CACHE_CLASSES = new WeakHashMap<>(); + + public Class<?> getClassByNameOrNull(String name) { + Map<String, WeakReference<Class<?>>> map; + + synchronized (CACHE_CLASSES) { + map = CACHE_CLASSES.get(getClassLoader()); + if (map == null) { + map = Collections.synchronizedMap(new WeakHashMap<>()); + CACHE_CLASSES.put(getClassLoader(), map); + } + } + + Class<?> clazz = null; + WeakReference<Class<?>> ref = map.get(name); + if (ref != null) { + clazz = ref.get(); + } + + if (clazz == null) { + try { + clazz = Class.forName(name, true, getClassLoader()); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + return null; + } + // two putters can race here, but they'll put the same class + map.put(name, new WeakReference<>(clazz)); + return clazz; + } else { + // cache hit + return clazz; + } + } + + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 90b25e0..f8841d6 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; @@ -54,6 +55,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -205,177 +207,186 @@ public class PutHDFS extends AbstractHadoopProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); + final FlowFile flowFile = session.get(); if (flowFile == null) { return; } - final Configuration configuration = getConfiguration(); final FileSystem hdfs = getFileSystem(); - if (configuration == null || hdfs == null) { + final Configuration configuration = getConfiguration(); + final UserGroupInformation ugi = getUserGroupInformation(); + + if (configuration == null || hdfs == null || ugi == null) { getLogger().error("HDFS not configured properly"); session.transfer(flowFile, REL_FAILURE); context.yield(); return; } - Path tempDotCopyFile = null; - try { - final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); - final Path configuredRootDirPath = new Path(dirValue); - - final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); - - final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B); - final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath); + ugi.doAs(new PrivilegedAction<Object>() { + @Override + public Object run() { + Path tempDotCopyFile = null; + FlowFile putFlowFile = flowFile; + try { + final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue(); + final Path configuredRootDirPath = new Path(dirValue); - final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); - final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); + final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); - final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger(); - final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs - .getDefaultReplication(configuredRootDirPath); + final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B); + final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath); - final CompressionCodec codec = getCompressionCodec(context, configuration); + final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); + final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); - final String filename = codec != null - ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension() - : flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger(); + final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs + .getDefaultReplication(configuredRootDirPath); - final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename); - final Path copyFile = new Path(configuredRootDirPath, filename); + final CompressionCodec codec = getCompressionCodec(context, configuration); - // Create destination directory if it does not exist - try { - if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) { - throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory"); - } - } catch (FileNotFoundException fe) { - if (!hdfs.mkdirs(configuredRootDirPath)) { - throw new IOException(configuredRootDirPath.toString() + " could not be created"); - } - changeOwner(context, hdfs, configuredRootDirPath); - } + final String filename = codec != null + ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension() + : putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); - final boolean destinationExists = hdfs.exists(copyFile); + final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename); + final Path copyFile = new Path(configuredRootDirPath, filename); - // If destination file already exists, resolve that based on processor configuration - if (destinationExists) { - switch (conflictResponse) { - case REPLACE_RESOLUTION: - if (hdfs.delete(copyFile, false)) { - getLogger().info("deleted {} in order to replace with the contents of {}", - new Object[]{copyFile, flowFile}); + // Create destination directory if it does not exist + try { + if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) { + throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory"); } - break; - case IGNORE_RESOLUTION: - session.transfer(flowFile, REL_SUCCESS); - getLogger().info("transferring {} to success because file with same name already exists", - new Object[]{flowFile}); - return; - case FAIL_RESOLUTION: - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - getLogger().warn("penalizing {} and routing to failure because file with same name already exists", - new Object[]{flowFile}); - return; - default: - break; - } - } + } catch (FileNotFoundException fe) { + if (!hdfs.mkdirs(configuredRootDirPath)) { + throw new IOException(configuredRootDirPath.toString() + " could not be created"); + } + changeOwner(context, hdfs, configuredRootDirPath); + } - // Write FlowFile to temp file on HDFS - final StopWatch stopWatch = new StopWatch(true); - session.read(flowFile, new InputStreamCallback() { + final boolean destinationExists = hdfs.exists(copyFile); - @Override - public void process(InputStream in) throws IOException { - OutputStream fos = null; - Path createdFile = null; - try { - if (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) { - fos = hdfs.append(copyFile, bufferSize); - } else { - fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize); - } - if (codec != null) { - fos = codec.createOutputStream(fos); + // If destination file already exists, resolve that based on processor configuration + if (destinationExists) { + switch (conflictResponse) { + case REPLACE_RESOLUTION: + if (hdfs.delete(copyFile, false)) { + getLogger().info("deleted {} in order to replace with the contents of {}", + new Object[]{copyFile, putFlowFile}); + } + break; + case IGNORE_RESOLUTION: + session.transfer(putFlowFile, REL_SUCCESS); + getLogger().info("transferring {} to success because file with same name already exists", + new Object[]{putFlowFile}); + return null; + case FAIL_RESOLUTION: + session.transfer(session.penalize(putFlowFile), REL_FAILURE); + getLogger().warn("penalizing {} and routing to failure because file with same name already exists", + new Object[]{putFlowFile}); + return null; + default: + break; } - createdFile = tempCopyFile; - BufferedInputStream bis = new BufferedInputStream(in); - StreamUtils.copy(bis, fos); - bis = null; - fos.flush(); - } finally { - try { - if (fos != null) { - fos.close(); - } - } catch (RemoteException re) { - // when talking to remote HDFS clusters, we don't notice problems until fos.close() - if (createdFile != null) { + } + + // Write FlowFile to temp file on HDFS + final StopWatch stopWatch = new StopWatch(true); + session.read(putFlowFile, new InputStreamCallback() { + + @Override + public void process(InputStream in) throws IOException { + OutputStream fos = null; + Path createdFile = null; + try { + if (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) { + fos = hdfs.append(copyFile, bufferSize); + } else { + fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize); + } + if (codec != null) { + fos = codec.createOutputStream(fos); + } + createdFile = tempCopyFile; + BufferedInputStream bis = new BufferedInputStream(in); + StreamUtils.copy(bis, fos); + bis = null; + fos.flush(); + } finally { try { - hdfs.delete(createdFile, false); + if (fos != null) { + fos.close(); + } + } catch (RemoteException re) { + // when talking to remote HDFS clusters, we don't notice problems until fos.close() + if (createdFile != null) { + try { + hdfs.delete(createdFile, false); + } catch (Throwable ignore) { + } + } + throw re; } catch (Throwable ignore) { } + fos = null; } - throw re; - } catch (Throwable ignore) { } - fos = null; - } - } - }); - stopWatch.stop(); - final String dataRate = stopWatch.calculateDataRate(flowFile.getSize()); - final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - tempDotCopyFile = tempCopyFile; - - if (!conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) - || (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && !destinationExists)) { - boolean renamed = false; - for (int i = 0; i < 10; i++) { // try to rename multiple times. - if (hdfs.rename(tempCopyFile, copyFile)) { - renamed = true; - break;// rename was successful - } - Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve - } - if (!renamed) { - hdfs.delete(tempCopyFile, false); - throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile - + " to its final filename"); - } + }); + stopWatch.stop(); + final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize()); + final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + tempDotCopyFile = tempCopyFile; + + if (!conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) + || (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && !destinationExists)) { + boolean renamed = false; + for (int i = 0; i < 10; i++) { // try to rename multiple times. + if (hdfs.rename(tempCopyFile, copyFile)) { + renamed = true; + break;// rename was successful + } + Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve + } + if (!renamed) { + hdfs.delete(tempCopyFile, false); + throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile + + " to its final filename"); + } - changeOwner(context, hdfs, copyFile); - } + changeOwner(context, hdfs, copyFile); + } - getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", - new Object[]{flowFile, copyFile, millis, dataRate}); + getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", + new Object[]{putFlowFile, copyFile, millis, dataRate}); - final String outputPath = copyFile.toString(); - final String newFilename = copyFile.getName(); - final String hdfsPath = copyFile.getParent().toString(); - flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename); - flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); - final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath; - session.getProvenanceReporter().send(flowFile, transitUri); + final String outputPath = copyFile.toString(); + final String newFilename = copyFile.getName(); + final String hdfsPath = copyFile.getParent().toString(); + putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename); + putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); + final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath; + session.getProvenanceReporter().send(putFlowFile, transitUri); - session.transfer(flowFile, REL_SUCCESS); + session.transfer(putFlowFile, REL_SUCCESS); - } catch (final Throwable t) { - if (tempDotCopyFile != null) { - try { - hdfs.delete(tempDotCopyFile, false); - } catch (Exception e) { - getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e}); + } catch (final Throwable t) { + if (tempDotCopyFile != null) { + try { + hdfs.delete(tempDotCopyFile, false); + } catch (Exception e) { + getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e}); + } + } + getLogger().error("Failed to write to HDFS due to {}", new Object[]{t}); + session.transfer(session.penalize(putFlowFile), REL_FAILURE); + context.yield(); } + + return null; } - getLogger().error("Failed to write to HDFS due to {}", new Object[]{t}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - context.yield(); - } + }); } protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) { http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-hadoop-libraries-bundle/nifi-hadoop-libraries-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-libraries-bundle/nifi-hadoop-libraries-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-hadoop-libraries-bundle/nifi-hadoop-libraries-nar/src/main/resources/META-INF/NOTICE index 894a43b..7a26c4d 100644 --- a/nifi-nar-bundles/nifi-hadoop-libraries-bundle/nifi-hadoop-libraries-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-hadoop-libraries-bundle/nifi-hadoop-libraries-nar/src/main/resources/META-INF/NOTICE @@ -253,6 +253,14 @@ The following binary components are provided under the Apache Software License v Apache Software Foundation that were originally developed at iClick, Inc., software copyright (c) 1999. + (ASLv2) Apache HTrace Core + The following NOTICE information applies: + Copyright 2016 The Apache Software Foundation + + Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin + is a distributed tracing system that is Apache 2.0 Licensed. + Copyright 2012 Twitter, Inc. + (ASLv2) Apache Tomcat The following NOTICE information applies: Apache Tomcat http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service-nar/src/main/resources/META-INF/NOTICE index 76ca05f..20551ba 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service-nar/src/main/resources/META-INF/NOTICE @@ -269,15 +269,13 @@ Apache Software License v2 Licensed under the Apache License, Version 2.0 - (ASLv2) HTrace Core - The following NOTICE information applies: - In addition, this product includes software dependencies. See - the accompanying LICENSE.txt for a listing of dependencies - that are NOT Apache licensed (with pointers to their licensing) + (ASLv2) Apache HTrace Core + The following NOTICE information applies: + Copyright 2016 The Apache Software Foundation - Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin - is a distributed tracing system that is Apache 2.0 Licensed. - Copyright 2012 Twitter, Inc. + Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin + is a distributed tracing system that is Apache 2.0 Licensed. + Copyright 2012 Twitter, Inc. (ASLv2) Jackson Core ASL The following NOTICE information applies: http://git-wip-us.apache.org/repos/asf/nifi/blob/fe59b341/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0cacf93..957390c 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ language governing permissions and limitations under the License. --> <spring.version>4.2.4.RELEASE</spring.version> <spring.security.version>4.0.3.RELEASE</spring.security.version> <jersey.version>1.19</jersey.version> - <hadoop.version>2.6.2</hadoop.version> + <hadoop.version>2.7.3</hadoop.version> <hadoop.guava.version>12.0.1</hadoop.guava.version> <hadoop.http.client.version>4.2.5</hadoop.http.client.version> <yammer.metrics.version>2.2.0</yammer.metrics.version> @@ -646,7 +646,6 @@ language governing permissions and limitations under the License. --> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-api</artifactId> @@ -668,6 +667,11 @@ language governing permissions and limitations under the License. --> <version>${hadoop.version}</version> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.7</version>