Repository: flink
Updated Branches:
  refs/heads/release-0.8 a6f9f9939 -> d573926ab


[FLINK-1640] Remove tailing slash from paths. Add tests for Path and 
FileOutputFormat.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d573926a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d573926a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d573926a

Branch: refs/heads/release-0.8
Commit: d573926ab50a03bb27e4a5e31e869ae9274be766
Parents: a6f9f99
Author: Fabian Hueske <fhue...@apache.org>
Authored: Wed Mar 4 15:24:26 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Mar 6 01:08:00 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/core/fs/Path.java     |  45 ++-
 .../api/common/io/FileOutputFormatTest.java     | 379 +++++++++++--------
 .../java/org/apache/flink/core/fs/PathTest.java | 142 +++++++
 3 files changed, 390 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d573926a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 950a38b..ca550aa 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -41,6 +41,8 @@ import org.apache.flink.util.StringUtils;
 /**
  * Names a file or directory in a {@link FileSystem}. Path strings use slash as
  * the directory separator. A path string is absolute if it begins with a 
slash.
+ *
+ * Tailing slashes are removed from the path.
  */
 public class Path implements IOReadableWritable, Serializable {
        
@@ -75,7 +77,7 @@ public class Path implements IOReadableWritable, Serializable 
{
         * Constructs a path object from a given URI.
         * 
         * @param uri
-        *        the URI to contruct the path object from
+        *        the URI to construct the path object from
         */
        public Path(URI uri) {
                this.uri = uri;
@@ -147,20 +149,24 @@ public class Path implements IOReadableWritable, 
Serializable {
        }
 
        /**
-        * Checks if the provided path string is either null or has zero length 
and throws
+        * Checks if the provided path string is either null or has zero length 
and throws
         * a {@link IllegalArgumentException} if any of the two conditions 
apply.
-        * 
+        * In addition, leading and tailing whitespaces are removed.
+        *
         * @param path
         *        the path string to be checked
+        * @return The checked and trimmed path.
         */
-       private void checkPathArg(String path) {
+       private String checkAndTrimPathArg(String path) {
                // disallow construction of a Path from an empty string
                if (path == null) {
                        throw new IllegalArgumentException("Can not create a 
Path from a null string");
                }
+               path = path.trim();
                if (path.length() == 0) {
                        throw new IllegalArgumentException("Can not create a 
Path from an empty string");
                }
+               return path;
        }
 
        /**
@@ -171,7 +177,7 @@ public class Path implements IOReadableWritable, 
Serializable {
         *        the string to construct a path from
         */
        public Path(String pathString) {
-               checkPathArg(pathString);
+               pathString = checkAndTrimPathArg(pathString);
 
                // We can't use 'new URI(String)' directly, since it assumes 
things are
                // escaped, which we don't require of Paths.
@@ -221,7 +227,7 @@ public class Path implements IOReadableWritable, 
Serializable {
         *        the path string
         */
        public Path(String scheme, String authority, String path) {
-               checkPathArg(path);
+               path = checkAndTrimPathArg(path);
                initialize(scheme, authority, path);
        }
 
@@ -251,9 +257,18 @@ public class Path implements IOReadableWritable, 
Serializable {
         * @return the normalized path string
         */
        private String normalizePath(String path) {
-               // remove double slashes & backslashes
-               path = path.replace("//", "/");
+
+               // remove leading and tailing whitespaces
+               path = path.trim();
+
+               // remove consecutive slashes & backslashes
                path = path.replace("\\", "/");
+               path = path.replaceAll("/+", "/");
+
+               // remove tailing separator
+               if(!path.equals(SEPARATOR) && path.endsWith(SEPARATOR)) {
+                       path = path.substring(0, path.length() - 
SEPARATOR.length());
+               }
 
                return path;
        }
@@ -310,23 +325,19 @@ public class Path implements IOReadableWritable, 
Serializable {
        }
 
        /**
-        * Returns the final component of this path.
+        * Returns the final component of this path, i.e., everything that 
follows the last separator.
         * 
         * @return the final component of the path
         */
        public String getName() {
                final String path = uri.getPath();
-               if (path.endsWith(SEPARATOR)) {
-                       final int slash = path.lastIndexOf(SEPARATOR, 
path.length() - SEPARATOR.length() - 1);
-                       return path.substring(slash + 1, path.length() - 
SEPARATOR.length());
-               } else {
-                       final int slash = path.lastIndexOf(SEPARATOR);
-                       return path.substring(slash + 1);
-               }
+               final int slash = path.lastIndexOf(SEPARATOR);
+               return path.substring(slash + 1);
        }
 
        /**
-        * Returns the parent of a path or <code>null</code> if at root.
+        * Returns the parent of a path, i.e., everything that precedes the 
last separator
+        * or <code>null</code> if at root.
         * 
         * @return the parent of a path or <code>null</code> if at root.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/d573926a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
index 43ded56..cc040b6 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
@@ -31,20 +31,19 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.types.IntValue;
 import org.junit.Test;
 
+import static org.junit.Assert.fail;
+
 public class FileOutputFormatTest {
 
        @Test
-       public void testCreateNoneParallelLocalFS() {
-               
+       public void testCreateNonParallelLocalFS() throws IOException {
+
                File tmpOutPath = null;
                File tmpOutFile = null;
-               try {
-                       tmpOutPath = 
File.createTempFile("fileOutputFormatTest", "Test1");
-                       tmpOutFile = new 
File(tmpOutPath.getAbsolutePath()+"/1");
-               } catch (IOException e) {
-                       throw new RuntimeException("Test in error", e);
-               }
-               
+
+               tmpOutPath = File.createTempFile("fileOutputFormatTest", 
"Test1");
+               tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+
                String tmpFilePath = tmpOutPath.toURI().toString();
 
                // check fail if file exists
@@ -54,18 +53,17 @@ public class FileOutputFormatTest {
                dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
 
                dfof.configure(new Configuration());
-               
-               boolean exception = false;
+
                try {
                        dfof.open(0, 1);
                        dfof.close();
+                       fail();
                } catch (Exception e) {
-                       exception = true;
+                       // exception expected
                }
-               Assert.assertTrue(exception);
+               tmpOutPath.delete();
 
                // check fail if directory exists
-               tmpOutPath.delete();
                Assert.assertTrue("Directory could not be created.", 
tmpOutPath.mkdir());
 
                dfof = new DummyFileOutputFormat();
@@ -74,57 +72,72 @@ public class FileOutputFormatTest {
                dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
 
                dfof.configure(new Configuration());
-               
-               exception = false;
+
                try {
                        dfof.open(0, 1);
                        dfof.close();
+                       fail();
                } catch (Exception e) {
-                       exception = true;
+                       // exception expected
                }
-               Assert.assertTrue(exception);
-               
-               // check success
                tmpOutPath.delete();
-               
+
+               // check success
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.NO_OVERWRITE);
                dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
 
                dfof.configure(new Configuration());
-               
-               exception = false;
+
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
-               
+               tmpOutPath.delete();
+
+               // check fail for path with tailing '/'
+               dfof = new DummyFileOutputFormat();
+               dfof.setOutputFilePath(new Path(tmpFilePath+"/"));
+               dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+               dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+               dfof.configure(new Configuration());
+
+               try {
+                       dfof.open(0, 1);
+                       dfof.close();
+               } catch (Exception e) {
+                       fail();
+               }
+               Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+               tmpOutPath.delete();
+
                // ----------- test again with always directory mode
-               
+
                // check fail if file exists
+               tmpOutPath.createNewFile();
+
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.NO_OVERWRITE);
                dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
 
                dfof.configure(new Configuration());
-               
-               exception = false;
+
                try {
                        dfof.open(0, 1);
                        dfof.close();
+                       fail();
                } catch (Exception e) {
-                       exception = true;
+                       // exception expected
                }
-               Assert.assertTrue(exception);
+               tmpOutPath.delete();
 
                // check success if directory exists
-               tmpOutPath.delete();
                Assert.assertTrue("Directory could not be created.", 
tmpOutPath.mkdir());
 
                dfof = new DummyFileOutputFormat();
@@ -133,20 +146,18 @@ public class FileOutputFormatTest {
                dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
 
                dfof.configure(new Configuration());
-               
-               exception = false;
+
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+               (new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
 
                // check custom file name inside directory if directory exists
-               (new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -155,15 +166,13 @@ public class FileOutputFormatTest {
                Configuration c = new Configuration();
                dfof.configure(c);
 
-               exception = false;
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
                File customOutFile = new 
File(tmpOutPath.getAbsolutePath()+"/fancy-1-0.avro");
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(customOutFile.exists() && 
customOutFile.isFile());
                customOutFile.delete();
@@ -171,11 +180,8 @@ public class FileOutputFormatTest {
                // check fail if file in directory exists
                // create file for test
                customOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
-               try {
-                       customOutFile.createNewFile();
-               } catch (IOException e) {
-                       Assert.fail("Error creating file");
-               }
+               customOutFile.createNewFile();
+
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -183,20 +189,17 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 1);
                        dfof.close();
+                       fail();
                } catch (Exception e) {
-                       exception = true;
+                       // exception expected
                }
-               Assert.assertTrue(exception);
-               
-               // check success if no file exists
-               // delete existing files
                (new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
                tmpOutPath.delete();
-               
+
+               // check success if no file exists
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -204,35 +207,47 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-                               
-               // clean up
                (new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
                tmpOutPath.delete();
-               
+
+               // check success for path with tailing '/'
+               dfof = new DummyFileOutputFormat();
+               dfof.setOutputFilePath(new Path(tmpFilePath+'/'));
+               dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+               dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+
+               dfof.configure(new Configuration());
+
+               try {
+                       dfof.open(0, 1);
+                       dfof.close();
+               } catch (Exception e) {
+                       fail();
+               }
+               Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
+               Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+               (new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
+               tmpOutPath.delete();
+
        }
        
        @Test
-       public void testCreateParallelLocalFS() {
+       public void testCreateParallelLocalFS() throws IOException {
                
                File tmpOutPath = null;
                File tmpOutFile = null;
-               try {
-                       tmpOutPath = 
File.createTempFile("fileOutputFormatTest", "Test1");
-                       tmpOutFile = new 
File(tmpOutPath.getAbsolutePath()+"/1");
-               } catch (IOException e) {
-                       throw new RuntimeException("Test in error", e);
-               }
-               
+
+               tmpOutPath = File.createTempFile("fileOutputFormatTest", 
"Test1");
+               tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+
                String tmpFilePath = tmpOutPath.toURI().toString();
 
                // check fail if file exists
@@ -243,17 +258,16 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               boolean exception = false;
                try {
                        dfof.open(0, 2);
                        dfof.close();
+                       fail();
                } catch (Exception e) {
-                       exception = true;
+                       // exception expected
                }
-               Assert.assertTrue(exception);
+               tmpOutPath.delete();
 
                // check success if directory exists
-               tmpOutPath.delete();
                Assert.assertTrue("Directory could not be created.", 
tmpOutPath.mkdir());
 
                dfof = new DummyFileOutputFormat();
@@ -263,18 +277,21 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 2);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-               
+               tmpOutFile.delete();
+               tmpOutPath.delete();
+
                // check fail if file in directory exists
+               tmpOutPath.mkdir();
+               tmpOutFile.createNewFile();
+
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -282,20 +299,17 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 2);
                        dfof.close();
+                       fail();
                } catch (Exception e) {
-                       exception = true;
+                       // exception expected
                }
-               Assert.assertTrue(exception);
-               
-               // check success if no file exists
-               // delete existing files
                tmpOutFile.delete();
                tmpOutPath.delete();
-               
+
+               // check success if no file exists
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.NO_OVERWRITE);
@@ -303,34 +317,47 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 2);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-               
-               // clean up
                tmpOutFile.delete();
                tmpOutPath.delete();
+
+               // check success for path with tailing '/'
+               dfof = new DummyFileOutputFormat();
+               dfof.setOutputFilePath(new Path(tmpFilePath+"/"));
+               dfof.setWriteMode(WriteMode.NO_OVERWRITE);
+               dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+               dfof.configure(new Configuration());
+
+               try {
+                       dfof.open(0, 2);
+                       dfof.close();
+               } catch (Exception e) {
+                       fail();
+               }
+               Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
+               Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+               tmpOutFile.delete();
+               tmpOutPath.delete();
+
        }
        
        @Test
-       public void testOverwriteNoneParallelLocalFS() {
+       public void testOverwriteNonParallelLocalFS() throws IOException {
                
                File tmpOutPath = null;
                File tmpOutFile = null;
-               try {
-                       tmpOutPath = 
File.createTempFile("fileOutputFormatTest", "Test1");
-                       tmpOutFile = new 
File(tmpOutPath.getAbsolutePath()+"/1");
-               } catch (IOException e) {
-                       throw new RuntimeException("Test in error", e);
-               }
-               
+
+               tmpOutPath = File.createTempFile("fileOutputFormatTest", 
"Test1");
+               tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+
                String tmpFilePath = tmpOutPath.toURI().toString();
 
                // check success if file exists
@@ -341,14 +368,12 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               boolean exception = false;
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
 
                // check success if directory exists
@@ -362,19 +387,16 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
-               
-               // check success
                tmpOutPath.delete();
-               
+
+               // check success
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -382,19 +404,37 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
-               
+               tmpOutPath.delete();
+
+               // check fail for path with tailing '/'
+               dfof = new DummyFileOutputFormat();
+               dfof.setOutputFilePath(new Path(tmpFilePath+"/"));
+               dfof.setWriteMode(WriteMode.OVERWRITE);
+               dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+               dfof.configure(new Configuration());
+
+               try {
+                       dfof.open(0, 1);
+                       dfof.close();
+               } catch (Exception e) {
+                       fail();
+               }
+               Assert.assertTrue(tmpOutPath.exists() && tmpOutPath.isFile());
+               tmpOutPath.delete();
+
                // ----------- test again with always directory mode
                
                // check success if file exists
+               tmpOutPath.createNewFile();
+
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -402,20 +442,19 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
 
-               // check success if directory exists
                tmpOutFile.delete();
                tmpOutPath.delete();
+
+               // check success if directory exists
                Assert.assertTrue("Directory could not be created.", 
tmpOutPath.mkdir());
 
                dfof = new DummyFileOutputFormat();
@@ -425,18 +464,21 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-               
+               tmpOutPath.delete();
+               tmpOutFile.delete();
+
                // check success if file in directory exists
+               tmpOutPath.mkdir();
+               tmpOutFile.createNewFile();
+
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -444,22 +486,18 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-               
-               // check success if no file exists
-               // delete existing files
-               tmpOutFile.delete();
                tmpOutPath.delete();
-               
+               tmpOutFile.delete();
+
+               // check success if no file exists
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -467,35 +505,46 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 1);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
+               }
+               Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
+               Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+               tmpOutFile.delete();
+               tmpOutPath.delete();
+
+               // check success for path with tailing '/'
+               dfof = new DummyFileOutputFormat();
+               dfof.setOutputFilePath(new Path(tmpFilePath+"/"));
+               dfof.setWriteMode(WriteMode.OVERWRITE);
+               dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
+
+               dfof.configure(new Configuration());
+
+               try {
+                       dfof.open(0, 1);
+                       dfof.close();
+               } catch (Exception e) {
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-               
-               // clean up
                tmpOutFile.delete();
                tmpOutPath.delete();
-               
        }
        
        @Test
-       public void testOverwriteParallelLocalFS() {
+       public void testOverwriteParallelLocalFS() throws IOException {
                
                File tmpOutPath = null;
                File tmpOutFile = null;
-               try {
-                       tmpOutPath = 
File.createTempFile("fileOutputFormatTest", "Test1");
-                       tmpOutFile = new 
File(tmpOutPath.getAbsolutePath()+"/1");
-               } catch (IOException e) {
-                       throw new RuntimeException("Test in error", e);
-               }
-               
+
+               tmpOutPath = File.createTempFile("fileOutputFormatTest", 
"Test1");
+               tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+
                String tmpFilePath = tmpOutPath.toURI().toString();
 
                // check success if file exists
@@ -506,20 +555,18 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               boolean exception = false;
                try {
                        dfof.open(0, 2);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-
-               // check success if directory exists
                tmpOutFile.delete();
                tmpOutPath.delete();
+
+               // check success if directory exists
                Assert.assertTrue("Directory could not be created.", 
tmpOutPath.mkdir());
 
                dfof = new DummyFileOutputFormat();
@@ -529,18 +576,21 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 2);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-               
+               tmpOutFile.delete();
+               tmpOutPath.delete();
+
                // check success if file in directory exists
+               tmpOutPath.mkdir();
+               tmpOutFile.createNewFile();
+
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -548,22 +598,18 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 2);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-               
-               // check success if no file exists
-               // delete existing files
                (new File(tmpOutPath.getAbsoluteFile()+"/1")).delete();
                tmpOutPath.delete();
-               
+
+               // check success if no file exists
                dfof = new DummyFileOutputFormat();
                dfof.setOutputFilePath(new Path(tmpFilePath));
                dfof.setWriteMode(WriteMode.OVERWRITE);
@@ -571,21 +617,36 @@ public class FileOutputFormatTest {
 
                dfof.configure(new Configuration());
                
-               exception = false;
                try {
                        dfof.open(0, 2);
                        dfof.close();
                } catch (Exception e) {
-                       exception = true;
+                       fail();
                }
-               Assert.assertTrue(!exception);
                Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
                Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
-               
-               // clean up
                tmpOutFile.delete();
                tmpOutPath.delete();
-               
+
+               // check success for path with tailing '/'
+               dfof = new DummyFileOutputFormat();
+               dfof.setOutputFilePath(new Path(tmpFilePath+"/"));
+               dfof.setWriteMode(WriteMode.OVERWRITE);
+               dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY);
+
+               dfof.configure(new Configuration());
+
+               try {
+                       dfof.open(0, 2);
+                       dfof.close();
+               } catch (Exception e) {
+                       fail();
+               }
+               Assert.assertTrue(tmpOutPath.exists() && 
tmpOutPath.isDirectory());
+               Assert.assertTrue(tmpOutFile.exists() && tmpOutFile.isFile());
+               tmpOutFile.delete();
+               tmpOutPath.delete();
+
        }
        
        // 
-------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d573926a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java 
b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
index 7f04fa6..1912cb3 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
@@ -23,6 +23,148 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 public class PathTest {
+
+       @Test
+       public void testPathFromString() {
+
+               Path p = new Path("/my/path");
+               assertEquals("/my/path", p.toUri().getPath());
+               assertNull(p.toUri().getScheme());
+
+               p = new Path("/my/path/");
+               assertEquals("/my/path", p.toUri().getPath());
+               assertNull(p.toUri().getScheme());
+
+               p = new Path("/my//path/");
+               assertEquals("/my/path", p.toUri().getPath());
+               assertNull(p.toUri().getScheme());
+
+               p = new Path("/my//path//a///");
+               assertEquals("/my/path/a", p.toUri().getPath());
+               assertNull(p.toUri().getScheme());
+
+               p = new Path("\\my\\path\\\\a\\\\\\");
+               assertEquals("/my/path/a", p.toUri().getPath());
+               assertNull(p.toUri().getScheme());
+
+               p = new Path("/my/path/ ");
+               assertEquals("/my/path", p.toUri().getPath());
+               assertNull(p.toUri().getScheme());
+
+               p = new Path("hdfs:///my/path");
+               assertEquals("/my/path", p.toUri().getPath());
+               assertEquals("hdfs", p.toUri().getScheme());
+
+               p = new Path("hdfs:///my/path/");
+               assertEquals("/my/path", p.toUri().getPath());
+               assertEquals("hdfs", p.toUri().getScheme());
+
+               p = new Path("file:///my/path");
+               assertEquals("/my/path", p.toUri().getPath());
+               assertEquals("file", p.toUri().getScheme());
+
+               try {
+                       new Path((String)null);
+                       fail();
+               } catch(Exception e) {
+                       // exception expected
+               }
+
+               try {
+                       new Path("");
+                       fail();
+               } catch(Exception e) {
+                       // exception expected
+               }
+
+               try {
+                       new Path(" ");
+                       fail();
+               } catch(Exception e) {
+                       // exception expected
+               }
+
+       }
+
+       @Test
+       public void testIsAbsolute() {
+
+               Path p = new Path("/my/abs/path");
+               assertTrue(p.isAbsolute());
+
+               p = new Path("./my/rel/path");
+               assertFalse(p.isAbsolute());
+
+       }
+
+       @Test
+       public void testGetName() {
+
+               Path p = new Path("/my/fancy/path");
+               assertEquals("path", p.getName());
+
+               p = new Path("/my/fancy/path/");
+               assertEquals("path", p.getName());
+
+               p = new Path("hdfs:///my/path");
+               assertEquals("path", p.getName());
+
+               p = new Path("hdfs:///myPath/");
+               assertEquals("myPath", p.getName());
+
+               p = new Path("/");
+               assertEquals("", p.getName());
+
+       }
+
+       @Test
+       public void testGetParent() {
+
+               Path p = new Path("/my/fancy/path");
+               assertEquals("/my/fancy", p.getParent().toUri().getPath());
+
+               p = new Path("/my/other/fancy/path/");
+               assertEquals("/my/other/fancy", 
p.getParent().toUri().getPath());
+
+               p = new Path("hdfs:///my/path");
+               assertEquals("/my", p.getParent().toUri().getPath());
+
+               p = new Path("hdfs:///myPath/");
+               assertEquals("/", p.getParent().toUri().getPath());
+
+               p = new Path("/");
+               assertNull(p.getParent());
+       }
+
+       @Test
+       public void testSuffix() {
+
+               Path p = new Path("/my/path");
+               p = p.suffix("_123");
+               assertEquals("/my/path_123", p.toUri().getPath());
+
+               p = new Path("/my/path/");
+               p = p.suffix("/abc");
+               assertEquals("/my/path/abc", p.toUri().getPath());
+
+       }
+
+       @Test
+       public void testDepth() {
+
+               Path p = new Path("/my/path");
+               assertEquals(2, p.depth());
+
+               p = new Path("/my/fancy/path/");
+               assertEquals(3, p.depth());
+
+               p = new 
Path("/my/fancy/fancy/fancy/fancy/fancy/fancy/fancy/fancy/fancy/fancy/path");
+               assertEquals(12, p.depth());
+
+               p = new Path("/");
+               assertEquals(0, p.depth());
+       }
+
        @Test
        public void testParsing() {
                URI u;

Reply via email to