Repository: incubator-nifi Updated Branches: refs/heads/develop 483958ba0 -> 322dac622
NIFI-600: Adding compression support to PutHDFS and GetHDFS Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a973cc4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a973cc4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a973cc4f Branch: refs/heads/develop Commit: a973cc4f2a39cce356125a40b1a4d16fb54601ac Parents: e1aa489 Author: Tim Reardon <tequal...@gmail.com> Authored: Thu May 7 20:23:30 2015 -0400 Committer: Tim Reardon <tequal...@gmail.com> Committed: Thu May 7 20:41:11 2015 -0400 ---------------------------------------------------------------------- .../hadoop/AbstractHadoopProcessor.java | 31 ++++++++++++++++ .../apache/nifi/processors/hadoop/GetHDFS.java | 10 ++++-- .../apache/nifi/processors/hadoop/PutHDFS.java | 11 ++++-- .../nifi/processors/hadoop/GetHDFSTest.java | 36 +++++++++++++++++-- .../nifi/processors/hadoop/PutHDFSTest.java | 17 ++++++++- .../test/resources/testdata/randombytes-1.gz | Bin 0 -> 500175 bytes 6 files changed, 98 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 355950f..548d34c 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -42,6 +42,13 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; +import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.net.NetUtils; /** @@ -60,6 +67,13 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { public static final String DIRECTORY_PROP_NAME = "Directory"; + public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("Compression codec") + .required(false) + .allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(), + GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName()) + .build(); + protected static final List<PropertyDescriptor> properties; static { @@ -228,6 +242,23 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { }; } + /** + * Returns the configured CompressionCodec, or null if none is configured. + * + * @param context the ProcessContext + * @param configuration the Hadoop Configuration + * @return CompressionCodec or null + */ + protected CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) { + CompressionCodec codec = null; + if (context.getProperty(COMPRESSION_CODEC).isSet()) { + String compressionClassname = context.getProperty(COMPRESSION_CODEC).getValue(); + CompressionCodecFactory ccf = new CompressionCodecFactory(configuration); + codec = ccf.getCodecByClassName(compressionClassname); + } + + return codec; + } /** * Returns the relative path of the child that does not include the filename http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 7aa534f..cac61b0 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.hadoop; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -33,11 +34,11 @@ import java.util.regex.Pattern; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -192,6 +193,7 @@ public class GetHDFS extends AbstractHadoopProcessor { props.add(POLLING_INTERVAL); props.add(BATCH_SIZE); props.add(BUFFER_SIZE); + props.add(COMPRESSION_CODEC); localProperties = Collections.unmodifiableList(props); } @@ -329,7 +331,7 @@ public class GetHDFS extends AbstractHadoopProcessor { protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) { // process the batch of files - FSDataInputStream stream = null; + InputStream stream = null; Configuration conf = getConfiguration(); FileSystem hdfs = getFileSystem(); final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); @@ -337,6 +339,7 @@ public class GetHDFS extends AbstractHadoopProcessor { int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue()); + final CompressionCodec codec = getCompressionCodec(context, conf); for (final Path file : files) { try { if (!hdfs.exists(file)) { @@ -346,6 +349,9 @@ public class GetHDFS extends AbstractHadoopProcessor { final String relativePath = getPathDifference(rootDir, file); stream = hdfs.open(file, bufferSize); + if (codec != null) { + stream = codec.createInputStream(stream); + } FlowFile flowFile = session.create(); final StopWatch stopWatch = new StopWatch(true); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 52cf475..419b1de 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -27,10 +28,10 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RemoteException; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -157,6 +158,7 @@ public class PutHDFS extends AbstractHadoopProcessor { props.add(UMASK); props.add(REMOTE_OWNER); props.add(REMOTE_GROUP); + props.add(COMPRESSION_CODEC); localProperties = Collections.unmodifiableList(props); } @@ -215,6 +217,8 @@ public class PutHDFS extends AbstractHadoopProcessor { final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs .getDefaultReplication(configuredRootDirPath); + final CompressionCodec codec = getCompressionCodec(context, configuration); + Path tempDotCopyFile = null; try { final Path tempCopyFile; @@ -266,10 +270,13 @@ public class PutHDFS extends AbstractHadoopProcessor { @Override public void process(InputStream in) throws IOException { - FSDataOutputStream fos = null; + OutputStream fos = null; Path createdFile = null; try { fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize); + if (codec != null) { + fos = codec.createOutputStream(fos); + } createdFile = tempCopyFile; BufferedInputStream bis = new BufferedInputStream(in); StreamUtils.copy(bis, fos); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java index d6015e0..b0dd17f 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java @@ -19,6 +19,8 @@ package org.apache.nifi.processors.hadoop; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -30,8 +32,9 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; - import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; import org.junit.Assert; import org.junit.Test; @@ -105,6 +108,19 @@ public class GetHDFSTest { for (ValidationResult vr : results) { Assert.assertTrue(vr.toString().contains("is invalid because Minimum File Age cannot be greater than Maximum File Age")); } + + results = new HashSet<>(); + runner.setProperty(GetHDFS.DIRECTORY, "/target"); + runner.setProperty(GetHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName()); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set")); + } } @Test @@ -115,9 +131,25 @@ public class GetHDFSTest { runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); runner.run(); List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS); - assertEquals(3, flowFiles.size()); + assertEquals(4, flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("random")); } } + + @Test + public void testGetFilesWithCompression() throws IOException { + TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); + runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz"); + runner.setProperty(GetHDFS.COMPRESSION_CODEC, GzipCodec.class.getName()); + runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); + runner.run(); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + MockFlowFile flowFile = flowFiles.get(0); + assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("randombytes-1.gz")); + InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1"); + flowFile.assertContentEquals(expected); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index df386d7..0b5fee8 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.hadoop; import org.apache.nifi.processors.hadoop.PutHDFS; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -33,10 +34,10 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -136,6 +137,20 @@ public class PutHDFSTest { for (ValidationResult vr : results) { assertTrue(vr.toString().contains("is invalid because octal umask [2000] is not a valid umask")); } + + results = new HashSet<>(); + runner = TestRunners.newTestRunner(PutHDFS.class); + runner.setProperty(PutHDFS.DIRECTORY, "/target"); + runner.setProperty(PutHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName()); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + for (ValidationResult vr : results) { + Assert.assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set")); + } } // The following only seems to work from cygwin...something about not finding the 'chmod' command. http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/randombytes-1.gz ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/randombytes-1.gz b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/randombytes-1.gz new file mode 100755 index 0000000..df844f2 Binary files /dev/null and b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/randombytes-1.gz differ