Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 483958ba0 -> 322dac622


NIFI-600: Adding compression support to PutHDFS and GetHDFS


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a973cc4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a973cc4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a973cc4f

Branch: refs/heads/develop
Commit: a973cc4f2a39cce356125a40b1a4d16fb54601ac
Parents: e1aa489
Author: Tim Reardon <tequal...@gmail.com>
Authored: Thu May 7 20:23:30 2015 -0400
Committer: Tim Reardon <tequal...@gmail.com>
Committed: Thu May 7 20:41:11 2015 -0400

----------------------------------------------------------------------
 .../hadoop/AbstractHadoopProcessor.java         |  31 ++++++++++++++++
 .../apache/nifi/processors/hadoop/GetHDFS.java  |  10 ++++--
 .../apache/nifi/processors/hadoop/PutHDFS.java  |  11 ++++--
 .../nifi/processors/hadoop/GetHDFSTest.java     |  36 +++++++++++++++++--
 .../nifi/processors/hadoop/PutHDFSTest.java     |  17 ++++++++-
 .../test/resources/testdata/randombytes-1.gz    | Bin 0 -> 500175 bytes
 6 files changed, 98 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 355950f..548d34c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -42,6 +42,13 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
 import org.apache.hadoop.net.NetUtils;
 
 /**
@@ -60,6 +67,13 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
 
     public static final String DIRECTORY_PROP_NAME = "Directory";
 
+    public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+        .name("Compression codec")
+        .required(false)
+        .allowableValues(BZip2Codec.class.getName(), 
DefaultCodec.class.getName(),
+            GzipCodec.class.getName(), Lz4Codec.class.getName(), 
SnappyCodec.class.getName())
+        .build();
+
     protected static final List<PropertyDescriptor> properties;
 
     static {
@@ -228,6 +242,23 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
         };
     }
 
+    /**
+     * Returns the configured CompressionCodec, or null if none is configured.
+     *
+     * @param context the ProcessContext
+     * @param configuration the Hadoop Configuration
+     * @return CompressionCodec or null
+     */
+    protected CompressionCodec getCompressionCodec(ProcessContext context, 
Configuration configuration) {
+        CompressionCodec codec = null;
+        if (context.getProperty(COMPRESSION_CODEC).isSet()) {
+            String compressionClassname = 
context.getProperty(COMPRESSION_CODEC).getValue();
+            CompressionCodecFactory ccf = new 
CompressionCodecFactory(configuration);
+            codec = ccf.getCodecByClassName(compressionClassname);
+        }
+
+        return codec;
+    }
 
     /**
      * Returns the relative path of the child that does not include the 
filename

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 7aa534f..cac61b0 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.hadoop;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,11 +34,11 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -192,6 +193,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
         props.add(POLLING_INTERVAL);
         props.add(BATCH_SIZE);
         props.add(BUFFER_SIZE);
+        props.add(COMPRESSION_CODEC);
         localProperties = Collections.unmodifiableList(props);
     }
 
@@ -329,7 +331,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
 
     protected void processBatchOfFiles(final List<Path> files, final 
ProcessContext context, final ProcessSession session) {
         // process the batch of files
-        FSDataInputStream stream = null;
+        InputStream stream = null;
         Configuration conf = getConfiguration();
         FileSystem hdfs = getFileSystem();
         final boolean keepSourceFiles = 
context.getProperty(KEEP_SOURCE_FILE).asBoolean();
@@ -337,6 +339,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
         int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : 
conf.getInt(BUFFER_SIZE_KEY,
                 BUFFER_SIZE_DEFAULT);
         final Path rootDir = new 
Path(context.getProperty(DIRECTORY).getValue());
+        final CompressionCodec codec = getCompressionCodec(context, conf);
         for (final Path file : files) {
             try {
                 if (!hdfs.exists(file)) {
@@ -346,6 +349,9 @@ public class GetHDFS extends AbstractHadoopProcessor {
                 final String relativePath = getPathDifference(rootDir, file);
 
                 stream = hdfs.open(file, bufferSize);
+                if (codec != null) {
+                    stream = codec.createInputStream(stream);
+                }
                 FlowFile flowFile = session.create();
 
                 final StopWatch stopWatch = new StopWatch(true);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 52cf475..419b1de 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -27,10 +28,10 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -157,6 +158,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
         props.add(UMASK);
         props.add(REMOTE_OWNER);
         props.add(REMOTE_GROUP);
+        props.add(COMPRESSION_CODEC);
         localProperties = Collections.unmodifiableList(props);
     }
 
@@ -215,6 +217,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
         final short replication = replicationProp != null ? 
replicationProp.shortValue() : hdfs
                 .getDefaultReplication(configuredRootDirPath);
 
+        final CompressionCodec codec = getCompressionCodec(context, 
configuration);
+
         Path tempDotCopyFile = null;
         try {
             final Path tempCopyFile;
@@ -266,10 +270,13 @@ public class PutHDFS extends AbstractHadoopProcessor {
 
                 @Override
                 public void process(InputStream in) throws IOException {
-                    FSDataOutputStream fos = null;
+                    OutputStream fos = null;
                     Path createdFile = null;
                     try {
                         fos = hdfs.create(tempCopyFile, true, bufferSize, 
replication, blockSize);
+                        if (codec != null) {
+                            fos = codec.createOutputStream(fos);
+                        }
                         createdFile = tempCopyFile;
                         BufferedInputStream bis = new BufferedInputStream(in);
                         StreamUtils.copy(bis, fos);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index d6015e0..b0dd17f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.hadoop;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -30,8 +32,9 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -105,6 +108,19 @@ public class GetHDFSTest {
         for (ValidationResult vr : results) {
             Assert.assertTrue(vr.toString().contains("is invalid because 
Minimum File Age cannot be greater than Maximum File Age"));
         }
+
+        results = new HashSet<>();
+        runner.setProperty(GetHDFS.DIRECTORY, "/target");
+        runner.setProperty(GetHDFS.COMPRESSION_CODEC, 
CompressionCodec.class.getName());
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            Assert.assertTrue(vr.toString().contains("is invalid because Given 
value not found in allowed set"));
+        }
     }
 
     @Test
@@ -115,9 +131,25 @@ public class GetHDFSTest {
         runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
         runner.run();
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
-        assertEquals(3, flowFiles.size());
+        assertEquals(4, flowFiles.size());
         for (MockFlowFile flowFile : flowFiles) {
             
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("random"));
         }
     }
+
+    @Test
+    public void testGetFilesWithCompression() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
+        runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
+        runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
+        runner.setProperty(GetHDFS.COMPRESSION_CODEC, 
GzipCodec.class.getName());
+        runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+        MockFlowFile flowFile = flowFiles.get(0);
+        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("randombytes-1.gz"));
+        InputStream expected = 
getClass().getResourceAsStream("/testdata/randombytes-1");
+        flowFile.assertContentEquals(expected);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index df386d7..0b5fee8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.hadoop;
 
 import org.apache.nifi.processors.hadoop.PutHDFS;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -33,10 +34,10 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -136,6 +137,20 @@ public class PutHDFSTest {
         for (ValidationResult vr : results) {
             assertTrue(vr.toString().contains("is invalid because octal umask 
[2000] is not a valid umask"));
         }
+
+        results = new HashSet<>();
+        runner = TestRunners.newTestRunner(PutHDFS.class);
+        runner.setProperty(PutHDFS.DIRECTORY, "/target");
+        runner.setProperty(PutHDFS.COMPRESSION_CODEC, 
CompressionCodec.class.getName());
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            Assert.assertTrue(vr.toString().contains("is invalid because Given 
value not found in allowed set"));
+        }
     }
 
     // The following only seems to work from cygwin...something about not 
finding the 'chmod' command.

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a973cc4f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/randombytes-1.gz
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/randombytes-1.gz
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/randombytes-1.gz
new file mode 100755
index 0000000..df844f2
Binary files /dev/null and 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/randombytes-1.gz
 differ

Reply via email to