Repository: nifi
Updated Branches:
  refs/heads/master 4a25402c1 -> 57ae9b65a


NIFI-1184 Mock FileSystem for PutHDFSTest

This closes #2892

Signed-off-by: Mike Thomsen <mikerthom...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/57ae9b65
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/57ae9b65
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/57ae9b65

Branch: refs/heads/master
Commit: 57ae9b65a0fd9f9f0880331debabbd75e64e1c40
Parents: 4a25402
Author: uday <udaygk...@gmail.com>
Authored: Sun Jul 15 02:24:58 2018 +0530
Committer: Mike Thomsen <mikerthom...@gmail.com>
Committed: Fri Sep 14 13:30:03 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/hadoop/PutHDFSTest.java     | 193 ++++++++++++++-----
 1 file changed, 140 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/57ae9b65/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 32569ac..46b377d 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -17,9 +17,14 @@
 package org.apache.nifi.processors.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progressable;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -36,12 +41,14 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URI;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -51,7 +58,6 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -59,29 +65,19 @@ public class PutHDFSTest {
 
     private NiFiProperties mockNiFiProperties;
     private KerberosProperties kerberosProperties;
-
-    @BeforeClass
-    public static void setUpClass() throws Exception {
-        /*
-         * Running Hadoop on Windows requires a special build which will 
produce required binaries and native modules [1]. Since functionality
-         * provided by this module and validated by these test does not have 
any native implication we do not distribute required binaries and native modules
-         * to support running these tests in Windows environment, therefore 
they are ignored. You can also get more info from this StackOverflow thread [2]
-         *
-         * [1] https://wiki.apache.org/hadoop/Hadoop2OnWindows
-         * [2] 
http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path
-         */
-    }
+    private FileSystem mockFileSystem;
 
     @Before
     public void setup() {
         mockNiFiProperties = mock(NiFiProperties.class);
         
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
         kerberosProperties = new KerberosProperties(null);
+        mockFileSystem = new MockFileSystem();
     }
 
     @Test
     public void testValidators() {
-        PutHDFS proc = new TestablePutHDFS(kerberosProperties);
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
         TestRunner runner = TestRunners.newTestRunner(proc);
         Collection<ValidationResult> results;
         ProcessContext pc;
@@ -119,7 +115,7 @@ public class PutHDFSTest {
             assertTrue(vr.toString().contains("is invalid because short 
integer must be greater than zero"));
         }
 
-        proc = new TestablePutHDFS(kerberosProperties);
+        proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
         runner = TestRunners.newTestRunner(proc);
         results = new HashSet<>();
         runner.setProperty(PutHDFS.DIRECTORY, "/target");
@@ -134,7 +130,7 @@ public class PutHDFSTest {
             assertTrue(vr.toString().contains("is invalid because short 
integer must be greater than zero"));
         }
 
-        proc = new TestablePutHDFS(kerberosProperties);
+        proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
         runner = TestRunners.newTestRunner(proc);
         results = new HashSet<>();
         runner.setProperty(PutHDFS.DIRECTORY, "/target");
@@ -149,7 +145,7 @@ public class PutHDFSTest {
             assertTrue(vr.toString().contains("is invalid because octal umask 
[-1] cannot be negative"));
         }
 
-        proc = new TestablePutHDFS(kerberosProperties);
+        proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
         runner = TestRunners.newTestRunner(proc);
         results = new HashSet<>();
         runner.setProperty(PutHDFS.DIRECTORY, "/target");
@@ -178,7 +174,7 @@ public class PutHDFSTest {
         }
 
         results = new HashSet<>();
-        proc = new TestablePutHDFS(kerberosProperties);
+        proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
         runner = TestRunners.newTestRunner(proc);
         runner.setProperty(PutHDFS.DIRECTORY, "/target");
         runner.setProperty(PutHDFS.COMPRESSION_CODEC, 
CompressionCodec.class.getName());
@@ -196,9 +192,8 @@ public class PutHDFSTest {
     @Test
     public void testPutFile() throws IOException {
         // Refer to comment in the BeforeClass method for an explanation
-        assumeTrue(isNotWindows());
 
-        PutHDFS proc = new TestablePutHDFS(kerberosProperties);
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
         TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
         runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
@@ -209,9 +204,6 @@ public class PutHDFSTest {
             runner.run();
         }
 
-        Configuration config = new Configuration();
-        FileSystem fs = FileSystem.get(config);
-
         List<MockFlowFile> failedFlowFiles = runner
                 .getFlowFilesForRelationship(new 
Relationship.Builder().name("failure").build());
         assertTrue(failedFlowFiles.isEmpty());
@@ -219,7 +211,7 @@ public class PutHDFSTest {
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
         assertEquals(1, flowFiles.size());
         MockFlowFile flowFile = flowFiles.get(0);
-        assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
+        assertTrue(mockFileSystem.exists(new 
Path("target/test-classes/randombytes-1")));
         assertEquals("randombytes-1", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         assertEquals("target/test-classes", 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
 
@@ -234,9 +226,8 @@ public class PutHDFSTest {
     @Test
     public void testPutFileWithCompression() throws IOException {
         // Refer to comment in the BeforeClass method for an explanation
-        assumeTrue(isNotWindows());
 
-        PutHDFS proc = new TestablePutHDFS(kerberosProperties);
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
         TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
         runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
@@ -248,9 +239,6 @@ public class PutHDFSTest {
             runner.run();
         }
 
-        Configuration config = new Configuration();
-        FileSystem fs = FileSystem.get(config);
-
         List<MockFlowFile> failedFlowFiles = runner
                 .getFlowFilesForRelationship(new 
Relationship.Builder().name("failure").build());
         assertTrue(failedFlowFiles.isEmpty());
@@ -258,7 +246,7 @@ public class PutHDFSTest {
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
         assertEquals(1, flowFiles.size());
         MockFlowFile flowFile = flowFiles.get(0);
-        assertTrue(fs.exists(new 
Path("target/test-classes/randombytes-1.gz")));
+        assertTrue(mockFileSystem.exists(new 
Path("target/test-classes/randombytes-1.gz")));
         assertEquals("randombytes-1.gz", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         assertEquals("target/test-classes", 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
     }
@@ -266,14 +254,11 @@ public class PutHDFSTest {
     @Test
     public void testPutFileWithException() throws IOException {
         // Refer to comment in the BeforeClass method for an explanation
-        assumeTrue(isNotWindows());
 
         String dirName = "target/testPutFileWrongPermissions";
         File file = new File(dirName);
         file.mkdirs();
-        Configuration config = new Configuration();
-        FileSystem fs = FileSystem.get(config);
-        Path p = new Path(dirName).makeQualified(fs.getUri(), 
fs.getWorkingDirectory());
+        Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), 
mockFileSystem.getWorkingDirectory());
 
         final KerberosProperties testKerberosProperties = kerberosProperties;
         TestRunner runner = TestRunners.newTestRunner(new PutHDFS() {
@@ -302,28 +287,24 @@ public class PutHDFSTest {
         assertFalse(failedFlowFiles.isEmpty());
         assertTrue(failedFlowFiles.get(0).isPenalized());
 
-        fs.delete(p, true);
+        mockFileSystem.delete(p, true);
     }
 
     @Test
     public void testPutFileWhenDirectoryUsesValidELFunction() throws 
IOException {
         // Refer to comment in the BeforeClass method for an explanation
-        assumeTrue(isNotWindows());
 
-        PutHDFS proc = new TestablePutHDFS(kerberosProperties);
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
         TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(PutHDFS.DIRECTORY, 
"target/data_${literal('testing'):substring(0,4)}");
         runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
         try (FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/randombytes-1");) {
-            Map<String, String> attributes = new HashMap<String, String>();
+            Map<String, String> attributes = new HashMap<>();
             attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
             runner.enqueue(fis, attributes);
             runner.run();
         }
 
-        Configuration config = new Configuration();
-        FileSystem fs = FileSystem.get(config);
-
         List<MockFlowFile> failedFlowFiles = runner
                 .getFlowFilesForRelationship(new 
Relationship.Builder().name("failure").build());
         assertTrue(failedFlowFiles.isEmpty());
@@ -331,7 +312,7 @@ public class PutHDFSTest {
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
         assertEquals(1, flowFiles.size());
         MockFlowFile flowFile = flowFiles.get(0);
-        assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
+        assertTrue(mockFileSystem.exists(new 
Path("target/data_test/randombytes-1")));
         assertEquals("randombytes-1", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         assertEquals("target/data_test", 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
     }
@@ -339,9 +320,8 @@ public class PutHDFSTest {
     @Test
     public void testPutFileWhenDirectoryUsesUnrecognizedEL() throws 
IOException {
         // Refer to comment in the BeforeClass method for an explanation
-        assumeTrue(isNotWindows());
 
-        PutHDFS proc = new TestablePutHDFS(kerberosProperties);
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
         TestRunner runner = TestRunners.newTestRunner(proc);
 
         // this value somehow causes NiFi to not even recognize the EL, and 
thus it returns successfully from calling
@@ -362,9 +342,8 @@ public class PutHDFSTest {
     @Test
     public void testPutFileWhenDirectoryUsesInvalidEL() throws IOException {
         // Refer to comment in the BeforeClass method for an explanation
-        assumeTrue(isNotWindows());
 
-        PutHDFS proc = new TestablePutHDFS(kerberosProperties);
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
         TestRunner runner = TestRunners.newTestRunner(proc);
         // the validator should pick up the invalid EL
         runner.setProperty(PutHDFS.DIRECTORY, 
"target/data_${literal('testing'):foo()}");
@@ -372,22 +351,130 @@ public class PutHDFSTest {
         runner.assertNotValid();
     }
 
-    private boolean isNotWindows() {
-        return !System.getProperty("os.name").startsWith("Windows");
-    }
-
-    private static class TestablePutHDFS extends PutHDFS {
+    private class TestablePutHDFS extends PutHDFS {
 
         private KerberosProperties testKerberosProperties;
+        private FileSystem fileSystem;
 
-        public TestablePutHDFS(KerberosProperties testKerberosProperties) {
+        public TestablePutHDFS(KerberosProperties testKerberosProperties, 
FileSystem fileSystem) {
             this.testKerberosProperties = testKerberosProperties;
+            this.fileSystem = fileSystem;
         }
 
         @Override
         protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
             return testKerberosProperties;
         }
+
+        @Override
+        protected FileSystem getFileSystem(Configuration config) throws 
IOException {
+            return fileSystem;
+        }
+
+        @Override
+        protected FileSystem getFileSystem() {
+            return fileSystem;
+        }
+    }
+
+    private class MockFileSystem extends FileSystem {
+        private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
+
+        @Override
+        public URI getUri() {
+            return URI.create("file:///");
+        }
+
+        @Override
+        public FSDataInputStream open(final Path f, final int bufferSize) 
throws IOException {
+            return null;
+        }
+
+        @Override
+        public FSDataOutputStream create(final Path f, final FsPermission 
permission, final boolean overwrite, final int bufferSize, final short 
replication,
+                                         final long blockSize, final 
Progressable progress) throws IOException {
+            pathToStatus.put(f, newFile(f));
+            return new FSDataOutputStream(new ByteArrayOutputStream(), new 
Statistics(""));
+        }
+
+        @Override
+        public FSDataOutputStream append(final Path f, final int bufferSize, 
final Progressable progress) throws IOException {
+            return null;
+        }
+
+        @Override
+        public boolean rename(final Path src, final Path dst) throws 
IOException {
+            if (pathToStatus.containsKey(src)) {
+                pathToStatus.put(dst, pathToStatus.remove(src));
+            } else {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public boolean delete(final Path f, final boolean recursive) throws 
IOException {
+            if (pathToStatus.containsKey(f)) {
+                pathToStatus.remove(f);
+            } else {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public FileStatus[] listStatus(final Path f) throws 
FileNotFoundException, IOException {
+            return null;
+        }
+
+        @Override
+        public void setWorkingDirectory(final Path new_dir) {
+
+        }
+
+        @Override
+        public Path getWorkingDirectory() {
+            return new Path(new File(".").getAbsolutePath());
+        }
+
+        @Override
+        public boolean mkdirs(final Path f, final FsPermission permission) 
throws IOException {
+            return false;
+        }
+
+        @Override
+        public boolean mkdirs(Path f) throws IOException {
+            pathToStatus.put(f, newDir(f));
+            return true;
+        }
+
+        @Override
+        public FileStatus getFileStatus(final Path f) throws IOException {
+            final FileStatus fileStatus = pathToStatus.get(f);
+            if (fileStatus == null) throw new FileNotFoundException();
+            return fileStatus;
+        }
+
+        @Override
+        public boolean exists(Path f) throws IOException {
+            return pathToStatus.containsKey(f);
+        }
+
+        private FileStatus newFile(Path p) {
+            return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 
1523456000000L, 1523457000000L, perms((short) 0644), "owner", "group", p);
+        }
+
+        private FileStatus newDir(Path p) {
+            return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 
1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", p);
+        }
+
+        @Override
+        public long getDefaultBlockSize(Path f) {
+            return 33554432L;
+        }
     }
 
+    static FsPermission perms(short p) {
+        return new FsPermission(p);
+    }
 }

Reply via email to