[FLINK-1820] Consistent behavior of numeric value parsers.

This closes #566


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39d526e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39d526e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39d526e6

Branch: refs/heads/master
Commit: 39d526e6f8b26ff35e1023c65293982285ffcc78
Parents: 6403dbd
Author: FelixNeutatz <neut...@googlemail.com>
Authored: Fri Apr 3 18:13:32 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue May 19 00:52:29 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/types/parser/ByteParser.java   |  68 +++++-----
 .../flink/types/parser/ByteValueParser.java     |   4 +
 .../apache/flink/types/parser/DoubleParser.java |  69 +++++-----
 .../flink/types/parser/DoubleValueParser.java   |   8 +-
 .../apache/flink/types/parser/FieldParser.java  |   8 +-
 .../apache/flink/types/parser/FloatParser.java  |  69 +++++-----
 .../flink/types/parser/FloatValueParser.java    |   8 +-
 .../apache/flink/types/parser/IntParser.java    |  69 +++++-----
 .../flink/types/parser/IntValueParser.java      |   4 +
 .../apache/flink/types/parser/LongParser.java   |  51 ++++----
 .../flink/types/parser/LongValueParser.java     |   4 +
 .../apache/flink/types/parser/ShortParser.java  |  47 +++----
 .../flink/types/parser/ShortValueParser.java    |   4 +
 .../flink/types/parser/ByteParserTest.java      |   7 +-
 .../flink/types/parser/ByteValueParserTest.java |   2 +-
 .../flink/types/parser/DoubleParserTest.java    |   2 +-
 .../types/parser/DoubleValueParserTest.java     |   2 +-
 .../flink/types/parser/FloatParserTest.java     |   2 +-
 .../types/parser/FloatValueParserTest.java      |   2 +-
 .../flink/types/parser/IntParserTest.java       |   2 +-
 .../flink/types/parser/IntValueParserTest.java  |   3 +-
 .../flink/types/parser/LongParserTest.java      |   2 +-
 .../flink/types/parser/LongValueParserTest.java |   2 +-
 .../flink/types/parser/ParserTestBase.java      |  40 +++++-
 .../flink/types/parser/ShortParserTest.java     |   2 +-
 .../types/parser/ShortValueParserTest.java      |   2 +-
 .../flink/api/java/io/CsvInputFormatTest.java   | 127 +++++++++++++++++--
 27 files changed, 405 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
index 5858da2..09e517a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
@@ -21,22 +21,23 @@ package org.apache.flink.types.parser;
 
 
 public class ByteParser extends FieldParser<Byte> {
-       
+
        private byte result;
-       
+
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Byte reusable) {
                int val = 0;
                boolean neg = false;
 
-               final int delimLimit = limit-delimiter.length+1;
-               
+               final int delimLimit = limit - delimiter.length + 1;
+
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
-                       
+
                        // check for empty field with only the sign
-                       if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
+                       if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, 
+                               delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
@@ -44,6 +45,10 @@ public class ByteParser extends FieldParser<Byte> {
 
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       return -1;
+                               }
                                this.result = (byte) (neg ? -val : val);
                                return i + delimiter.length;
                        }
@@ -53,17 +58,17 @@ public class ByteParser extends FieldParser<Byte> {
                        }
                        val *= 10;
                        val += bytes[i] - 48;
-                       
+
                        if (val > Byte.MAX_VALUE && (!neg || val > 
-Byte.MIN_VALUE)) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
                                return -1;
                        }
                }
-               
+
                this.result = (byte) (neg ? -val : val);
                return limit;
        }
-       
+
        @Override
        public Byte createValue() {
                return Byte.MIN_VALUE;
@@ -73,43 +78,40 @@ public class ByteParser extends FieldParser<Byte> {
        public Byte getLastResult() {
                return Byte.valueOf(this.result);
        }
-       
+
        /**
-        * Static utility to parse a field of type byte from a byte sequence 
that represents text characters
+        * Static utility to parse a field of type byte from a byte sequence 
that represents text 
+        * characters
         * (such as when read from a file stream).
-        * 
-        * @param bytes The bytes containing the text data that should be 
parsed.
+        *
+        * @param bytes    The bytes containing the text data that should be 
parsed.
         * @param startPos The offset to start the parsing.
-        * @param length The length of the byte sequence (counting from the 
offset).
-        * 
+        * @param length   The length of the byte sequence (counting from the 
offset).
         * @return The parsed value.
-        * 
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
         */
        public static final byte parseField(byte[] bytes, int startPos, int 
length) {
                return parseField(bytes, startPos, length, (char) 0xffff);
        }
-       
+
        /**
-        * Static utility to parse a field of type byte from a byte sequence 
that represents text characters
+        * Static utility to parse a field of type byte from a byte sequence 
that represents text 
+        * characters
         * (such as when read from a file stream).
-        * 
-        * @param bytes The bytes containing the text data that should be 
parsed.
-        * @param startPos The offset to start the parsing.
-        * @param length The length of the byte sequence (counting from the 
offset).
+        *
+        * @param bytes     The bytes containing the text data that should be 
parsed.
+        * @param startPos  The offset to start the parsing.
+        * @param length    The length of the byte sequence (counting from the 
offset).
         * @param delimiter The delimiter that terminates the field.
-        * 
         * @return The parsed value.
-        * 
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
         */
        public static final byte parseField(byte[] bytes, int startPos, int 
length, char delimiter) {
-               if (length <= 0) {
-                       throw new NumberFormatException("Invalid input: Empty 
string");
-               }
                long val = 0;
                boolean neg = false;
-               
+
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
@@ -118,17 +120,17 @@ public class ByteParser extends FieldParser<Byte> {
                                throw new NumberFormatException("Orphaned minus 
sign.");
                        }
                }
-               
+
                for (; length > 0; startPos++, length--) {
                        if (bytes[startPos] == delimiter) {
-                               return (byte) (neg ? -val : val);
+                               throw new NumberFormatException("Empty field.");
                        }
                        if (bytes[startPos] < 48 || bytes[startPos] > 57) {
                                throw new NumberFormatException("Invalid 
character.");
                        }
                        val *= 10;
                        val += bytes[startPos] - 48;
-                       
+
                        if (val > Byte.MAX_VALUE && (!neg || val > 
-Byte.MIN_VALUE)) {
                                throw new NumberFormatException("Value 
overflow/underflow");
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
index f9b36e4..612a1cb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
@@ -52,6 +52,10 @@ public class ByteValueParser extends FieldParser<ByteValue> {
                for (int i = startPos; i < limit; i++) {
 
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       return -1;
+                               }
                                reusable.setValue((byte) (neg ? -val : val));
                                return i + delimiter.length;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 947fdfe..086c1f5 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -23,35 +23,39 @@ package org.apache.flink.types.parser;
  * Parses a text field into a Double.
  */
 public class DoubleParser extends FieldParser<Double> {
-       
+
        private static final Double DOUBLE_INSTANCE = Double.valueOf(0.0);
-       
+
        private double result;
-       
+
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Double reusable) {
                int i = startPos;
 
-               final int delimLimit = limit-delimiter.length+1;
-               
+               final int delimLimit = limit - delimiter.length + 1;
+
                while (i < limit) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                break;
                        }
                        i++;
                }
-               
-               String str = new String(bytes, startPos, i-startPos);
+
+               String str = new String(bytes, startPos, i - startPos);
+               int len = str.length();
+               if (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+                       
setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+                       return -1;
+               }
                try {
                        this.result = Double.parseDouble(str);
                        return (i == limit) ? limit : i + delimiter.length;
-               }
-               catch (NumberFormatException e) {
+               } catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
                        return -1;
                }
        }
-       
+
        @Override
        public Double createValue() {
                return DOUBLE_INSTANCE;
@@ -61,35 +65,35 @@ public class DoubleParser extends FieldParser<Double> {
        public Double getLastResult() {
                return Double.valueOf(this.result);
        }
-       
+
        /**
-        * Static utility to parse a field of type double from a byte sequence 
that represents text characters
+        * Static utility to parse a field of type double from a byte sequence 
that represents text 
+        * characters
         * (such as when read from a file stream).
-        * 
-        * @param bytes The bytes containing the text data that should be 
parsed.
+        *
+        * @param bytes    The bytes containing the text data that should be 
parsed.
         * @param startPos The offset to start the parsing.
-        * @param length The length of the byte sequence (counting from the 
offset).
-        * 
+        * @param length   The length of the byte sequence (counting from the 
offset).
         * @return The parsed value.
-        * 
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
         */
        public static final double parseField(byte[] bytes, int startPos, int 
length) {
                return parseField(bytes, startPos, length, (char) 0xffff);
        }
-       
+
        /**
-        * Static utility to parse a field of type double from a byte sequence 
that represents text characters
+        * Static utility to parse a field of type double from a byte sequence 
that represents text 
+        * characters
         * (such as when read from a file stream).
-        * 
-        * @param bytes The bytes containing the text data that should be 
parsed.
-        * @param startPos The offset to start the parsing.
-        * @param length The length of the byte sequence (counting from the 
offset).
+        *
+        * @param bytes     The bytes containing the text data that should be 
parsed.
+        * @param startPos  The offset to start the parsing.
+        * @param length    The length of the byte sequence (counting from the 
offset).
         * @param delimiter The delimiter that terminates the field.
-        * 
         * @return The parsed value.
-        * 
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
         */
        public static final double parseField(byte[] bytes, int startPos, int 
length, char delimiter) {
                if (length <= 0) {
@@ -97,12 +101,17 @@ public class DoubleParser extends FieldParser<Double> {
                }
                int i = 0;
                final byte delByte = (byte) delimiter;
-               
+
                while (i < length && bytes[i] != delByte) {
                        i++;
                }
-               
-               String str = new String(bytes, startPos, i);
+
+               String str = new String(bytes, startPos, i - startPos);
+               int len = str.length();
+               if (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+                       throw new NumberFormatException("There is leading or 
trailing whitespace in the " +
+                               "numeric field: " + str);
+               }
                return Double.parseDouble(str);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index e225c1f..7751831 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -33,7 +33,7 @@ public class DoubleValueParser extends 
FieldParser<DoubleValue> {
                
                int i = startPos;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
 
                while (i < limit) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
@@ -42,7 +42,11 @@ public class DoubleValueParser extends 
FieldParser<DoubleValue> {
                        i++;
                }
                
-               String str = new String(bytes, startPos, i-startPos);
+               String str = new String(bytes, startPos, i - startPos);
+               if (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+                       
setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+                       return -1;
+               }
                try {
                        double value = Double.parseDouble(str);
                        reusable.setValue(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 33697fd..55e9915 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -64,7 +64,13 @@ public abstract class FieldParser<T> {
                UNTERMINATED_QUOTED_STRING,
 
                /** The parser found characters between the end of the quoted 
string and the delimiter. */
-               UNQUOTED_CHARS_AFTER_QUOTED_STRING
+               UNQUOTED_CHARS_AFTER_QUOTED_STRING,
+               
+               /** The string is empty. */
+               EMPTY_STRING,
+
+               /** There is whitespace in a numeric field. */
+               WHITESPACE_IN_NUMERIC_FIELD
        }
        
        private ParseErrorState errorState = ParseErrorState.NONE;

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index 7d166c7..be98aa1 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -23,15 +23,16 @@ package org.apache.flink.types.parser;
  * Parses a text field into a {@link Float}.
  */
 public class FloatParser extends FieldParser<Float> {
-       
+
        private float result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Float reusable) {
-               
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Float 
+               reusable) {
+
                int i = startPos;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
 
                while (i < limit) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
@@ -39,18 +40,23 @@ public class FloatParser extends FieldParser<Float> {
                        }
                        i++;
                }
-               
-               String str = new String(bytes, startPos, i-startPos);
+
+               if (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+                       
setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+                       return -1;
+               }
+
+               String str = new String(bytes, startPos, i - startPos);
+               int len = str.length();
                try {
                        this.result = Float.parseFloat(str);
-                       return (i == limit) ? limit : i+ delimiter.length;
-               }
-               catch (NumberFormatException e) {
+                       return (i == limit) ? limit : i + delimiter.length;
+               } catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
                        return -1;
                }
        }
-       
+
        @Override
        public Float createValue() {
                return Float.MIN_VALUE;
@@ -60,35 +66,35 @@ public class FloatParser extends FieldParser<Float> {
        public Float getLastResult() {
                return Float.valueOf(this.result);
        }
-       
+
        /**
-        * Static utility to parse a field of type float from a byte sequence 
that represents text characters
+        * Static utility to parse a field of type float from a byte sequence 
that represents text 
+        * characters
         * (such as when read from a file stream).
-        * 
-        * @param bytes The bytes containing the text data that should be 
parsed.
+        *
+        * @param bytes    The bytes containing the text data that should be 
parsed.
         * @param startPos The offset to start the parsing.
-        * @param length The length of the byte sequence (counting from the 
offset).
-        * 
+        * @param length   The length of the byte sequence (counting from the 
offset).
         * @return The parsed value.
-        * 
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
         */
        public static final float parseField(byte[] bytes, int startPos, int 
length) {
                return parseField(bytes, startPos, length, (char) 0xffff);
        }
-       
+
        /**
-        * Static utility to parse a field of type float from a byte sequence 
that represents text characters
+        * Static utility to parse a field of type float from a byte sequence 
that represents text 
+        * characters
         * (such as when read from a file stream).
-        * 
-        * @param bytes The bytes containing the text data that should be 
parsed.
-        * @param startPos The offset to start the parsing.
-        * @param length The length of the byte sequence (counting from the 
offset).
+        *
+        * @param bytes     The bytes containing the text data that should be 
parsed.
+        * @param startPos  The offset to start the parsing.
+        * @param length    The length of the byte sequence (counting from the 
offset).
         * @param delimiter The delimiter that terminates the field.
-        * 
         * @return The parsed value.
-        * 
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
         */
        public static final float parseField(byte[] bytes, int startPos, int 
length, char delimiter) {
                if (length <= 0) {
@@ -96,12 +102,17 @@ public class FloatParser extends FieldParser<Float> {
                }
                int i = 0;
                final byte delByte = (byte) delimiter;
-               
+
                while (i < length && bytes[i] != delByte) {
                        i++;
                }
                
-               String str = new String(bytes, startPos, i);
+               String str = new String(bytes, startPos, i - startPos);
+               if (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+                       throw new NumberFormatException("There is leading or 
trailing whitespace in the " +
+                               "numeric field: " + str);
+               }
+               int len = str.length();
                return Float.parseFloat(str);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index af16d4c..e8caac2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -33,7 +33,7 @@ public class FloatValueParser extends FieldParser<FloatValue> 
{
                
                int i = startPos;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
 
                while (i < limit) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
@@ -42,7 +42,11 @@ public class FloatValueParser extends 
FieldParser<FloatValue> {
                        i++;
                }
                
-               String str = new String(bytes, startPos, i-startPos);
+               String str = new String(bytes, startPos, i - startPos);
+               if (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+                       
setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+                       return -1;
+               }
                try {
                        float value = Float.parseFloat(str);
                        reusable.setValue(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
index c871f4a..dcd2ec2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
@@ -25,32 +25,38 @@ package org.apache.flink.types.parser;
  * The parser does not check for the maximum value.
  */
 public class IntParser extends FieldParser<Integer> {
-       
+
        private static final long OVERFLOW_BOUND = 0x7fffffffL;
        private static final long UNDERFLOW_BOUND = 0x80000000L;
 
        private int result;
-       
+
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Integer reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Integer 
+               reusable) {
                long val = 0;
                boolean neg = false;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
 
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
-                       
+
                        // check for empty field with only the sign
-                       if (startPos == limit || ( startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
+                       if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, 
+                               delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
-               
+
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       return -1;
+                               }
                                this.result = (int) (neg ? -val : val);
                                return i + delimiter.length;
                        }
@@ -60,17 +66,17 @@ public class IntParser extends FieldParser<Integer> {
                        }
                        val *= 10;
                        val += bytes[i] - 48;
-                       
+
                        if (val > OVERFLOW_BOUND && (!neg || val > 
UNDERFLOW_BOUND)) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
                                return -1;
                        }
                }
-               
+
                this.result = (int) (neg ? -val : val);
                return limit;
        }
-       
+
        @Override
        public Integer createValue() {
                return Integer.MIN_VALUE;
@@ -80,40 +86,37 @@ public class IntParser extends FieldParser<Integer> {
        public Integer getLastResult() {
                return Integer.valueOf(this.result);
        }
-       
+
        /**
-        * Static utility to parse a field of type int from a byte sequence 
that represents text characters
+        * Static utility to parse a field of type int from a byte sequence 
that represents text 
+        * characters
         * (such as when read from a file stream).
-        * 
-        * @param bytes The bytes containing the text data that should be 
parsed.
+        *
+        * @param bytes    The bytes containing the text data that should be 
parsed.
         * @param startPos The offset to start the parsing.
-        * @param length The length of the byte sequence (counting from the 
offset).
-        * 
+        * @param length   The length of the byte sequence (counting from the 
offset).
         * @return The parsed value.
-        * 
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
         */
        public static final int parseField(byte[] bytes, int startPos, int 
length) {
                return parseField(bytes, startPos, length, (char) 0xffff);
        }
-       
+
        /**
-        * Static utility to parse a field of type int from a byte sequence 
that represents text characters
+        * Static utility to parse a field of type int from a byte sequence 
that represents text 
+        * characters
         * (such as when read from a file stream).
-        * 
-        * @param bytes The bytes containing the text data that should be 
parsed.
-        * @param startPos The offset to start the parsing.
-        * @param length The length of the byte sequence (counting from the 
offset).
+        *
+        * @param bytes     The bytes containing the text data that should be 
parsed.
+        * @param startPos  The offset to start the parsing.
+        * @param length    The length of the byte sequence (counting from the 
offset).
         * @param delimiter The delimiter that terminates the field.
-        * 
         * @return The parsed value.
-        * 
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
         */
        public static final int parseField(byte[] bytes, int startPos, int 
length, char delimiter) {
-               if (length <= 0) {
-                       throw new NumberFormatException("Invalid input: Empty 
string");
-               }
                long val = 0;
                boolean neg = false;
 
@@ -125,17 +128,17 @@ public class IntParser extends FieldParser<Integer> {
                                throw new NumberFormatException("Orphaned minus 
sign.");
                        }
                }
-               
+
                for (; length > 0; startPos++, length--) {
                        if (bytes[startPos] == delimiter) {
-                               return (int) (neg ? -val : val);
+                               throw new NumberFormatException("Empty field.");
                        }
                        if (bytes[startPos] < 48 || bytes[startPos] > 57) {
                                throw new NumberFormatException("Invalid 
character.");
                        }
                        val *= 10;
                        val += bytes[startPos] - 48;
-                       
+
                        if (val > OVERFLOW_BOUND && (!neg || val > 
UNDERFLOW_BOUND)) {
                                throw new NumberFormatException("Value 
overflow/underflow");
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
index 8cb8176..abd8615 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
@@ -54,6 +54,10 @@ public class IntValueParser extends FieldParser<IntValue> {
                
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       return -1;
+                               }
                                reusable.setValue((int) (neg ? -val : val));
                                return i + delimiter.length;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
index af17f15..bb6c7c9 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
@@ -24,9 +24,9 @@ package org.apache.flink.types.parser;
  * Only characters '1' to '0' and '-' are allowed.
  */
 public class LongParser extends FieldParser<Long> {
-       
+
        private long result;
-       
+
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Long reusable) {
                long val = 0;
@@ -37,16 +37,20 @@ public class LongParser extends FieldParser<Long> {
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
-                       
+
                        // check for empty field with only the sign
                        if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
-               
+
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       return -1;
+                               }
                                this.result = neg ? -val : val;
                                return i + delimiter.length;
                        }
@@ -56,15 +60,15 @@ public class LongParser extends FieldParser<Long> {
                        }
                        val *= 10;
                        val += bytes[i] - 48;
-                       
+
                        // check for overflow / underflow
                        if (val < 0) {
                                // this is an overflow/underflow, unless we hit 
exactly the Long.MIN_VALUE
                                if (neg && val == Long.MIN_VALUE) {
                                        this.result = Long.MIN_VALUE;
-                                       
+
                                        if (i+1 >= limit) {
-                                               return limit; 
+                                               return limit;
                                        } else if (i+1 < delimLimit && 
delimiterNext(bytes, i+1, delimiter)) {
                                                return i + 1 + delimiter.length;
                                        } else {
@@ -78,57 +82,54 @@ public class LongParser extends FieldParser<Long> {
                                }
                        }
                }
-               
+
                this.result = neg ? -val : val;
                return limit;
        }
-       
+
        @Override
        public Long createValue() {
                return Long.MIN_VALUE;
        }
-       
+
        @Override
        public Long getLastResult() {
                return Long.valueOf(this.result);
        }
-       
+
        /**
         * Static utility to parse a field of type long from a byte sequence 
that represents text characters
         * (such as when read from a file stream).
-        * 
+        *
         * @param bytes The bytes containing the text data that should be 
parsed.
         * @param startPos The offset to start the parsing.
         * @param length The length of the byte sequence (counting from the 
offset).
-        * 
+        *
         * @return The parsed value.
-        * 
+        *
         * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
         */
        public static final long parseField(byte[] bytes, int startPos, int 
length) {
                return parseField(bytes, startPos, length, (char) 0xffff);
        }
-       
+
        /**
         * Static utility to parse a field of type long from a byte sequence 
that represents text characters
         * (such as when read from a file stream).
-        * 
+        *
         * @param bytes The bytes containing the text data that should be 
parsed.
         * @param startPos The offset to start the parsing.
         * @param length The length of the byte sequence (counting from the 
offset).
         * @param delimiter The delimiter that terminates the field.
-        * 
+        *
         * @return The parsed value.
-        * 
+        *
         * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
         */
        public static final long parseField(byte[] bytes, int startPos, int 
length, char delimiter) {
-               if (length <= 0) {
-                       throw new NumberFormatException("Invalid input: Empty 
string");
-               }
                long val = 0;
                boolean neg = false;
-               
+
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
@@ -137,17 +138,17 @@ public class LongParser extends FieldParser<Long> {
                                throw new NumberFormatException("Orphaned minus 
sign.");
                        }
                }
-               
+
                for (; length > 0; startPos++, length--) {
                        if (bytes[startPos] == delimiter) {
-                               return neg ? -val : val;
+                               throw new NumberFormatException("Empty field.");
                        }
                        if (bytes[startPos] < 48 || bytes[startPos] > 57) {
                                throw new NumberFormatException("Invalid 
character.");
                        }
                        val *= 10;
                        val += bytes[startPos] - 48;
-                       
+
                        // check for overflow / underflow
                        if (val < 0) {
                                // this is an overflow/underflow, unless we hit 
exactly the Long.MIN_VALUE

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
index 8b697cc..a99a86e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
@@ -51,6 +51,10 @@ public class LongValueParser extends FieldParser<LongValue> {
                
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       return -1;
+                               }
                                reusable.setValue(neg ? -val : val);
                                return i + delimiter.length;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
index a6f9898..6e04d60 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
@@ -25,10 +25,10 @@ package org.apache.flink.types.parser;
  * The parser does not check for the maximum value.
  */
 public class ShortParser extends FieldParser<Short> {
-       
+
        private static final int OVERFLOW_BOUND = 0x7fff;
        private static final int UNDERFLOW_BOUND = 0x8000;
-       
+
        private short result;
 
        @Override
@@ -37,20 +37,24 @@ public class ShortParser extends FieldParser<Short> {
                boolean neg = false;
 
                final int delimLimit = limit-delimiter.length+1;
-               
+
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
-                       
+
                        // check for empty field with only the sign
                        if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
-               
+
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       return -1;
+                               }
                                this.result = (short) (neg ? -val : val);
                                return i + delimiter.length;
                        }
@@ -60,17 +64,17 @@ public class ShortParser extends FieldParser<Short> {
                        }
                        val *= 10;
                        val += bytes[i] - 48;
-                       
+
                        if (val > OVERFLOW_BOUND && (!neg || val > 
UNDERFLOW_BOUND)) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
                                return -1;
                        }
                }
-               
+
                this.result = (short) (neg ? -val : val);
                return limit;
        }
-       
+
        @Override
        public Short createValue() {
                return Short.MIN_VALUE;
@@ -80,43 +84,40 @@ public class ShortParser extends FieldParser<Short> {
        public Short getLastResult() {
                return Short.valueOf(this.result);
        }
-       
+
        /**
         * Static utility to parse a field of type short from a byte sequence 
that represents text characters
         * (such as when read from a file stream).
-        * 
+        *
         * @param bytes The bytes containing the text data that should be 
parsed.
         * @param startPos The offset to start the parsing.
         * @param length The length of the byte sequence (counting from the 
offset).
-        * 
+        *
         * @return The parsed value.
-        * 
+        *
         * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
         */
        public static final short parseField(byte[] bytes, int startPos, int 
length) {
                return parseField(bytes, startPos, length, (char) 0xffff);
        }
-       
+
        /**
         * Static utility to parse a field of type short from a byte sequence 
that represents text characters
         * (such as when read from a file stream).
-        * 
+        *
         * @param bytes The bytes containing the text data that should be 
parsed.
         * @param startPos The offset to start the parsing.
         * @param length The length of the byte sequence (counting from the 
offset).
         * @param delimiter The delimiter that terminates the field.
-        * 
+        *
         * @return The parsed value.
-        * 
+        *
         * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text represents not a correct number.
         */
        public static final short parseField(byte[] bytes, int startPos, int 
length, char delimiter) {
-               if (length <= 0) {
-                       throw new NumberFormatException("Invalid input: Empty 
string");
-               }
                long val = 0;
                boolean neg = false;
-               
+
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
@@ -125,17 +126,17 @@ public class ShortParser extends FieldParser<Short> {
                                throw new NumberFormatException("Orphaned minus 
sign.");
                        }
                }
-               
+
                for (; length > 0; startPos++, length--) {
                        if (bytes[startPos] == delimiter) {
-                               return (short) (neg ? -val : val);
+                               throw new NumberFormatException("Empty field.");
                        }
                        if (bytes[startPos] < 48 || bytes[startPos] > 57) {
                                throw new NumberFormatException("Invalid 
character.");
                        }
                        val *= 10;
                        val += bytes[startPos] - 48;
-                       
+
                        if (val > OVERFLOW_BOUND && (!neg || val > 
UNDERFLOW_BOUND)) {
                                throw new NumberFormatException("Value 
overflow/underflow");
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
index f5168cc..4289d1a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
@@ -54,6 +54,10 @@ public class ShortValueParser extends 
FieldParser<ShortValue> {
                
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       return -1;
+                               }
                                reusable.setValue((short) (neg ? -val : val));
                                return i + delimiter.length;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
index 37d6903..ac49783 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
@@ -22,6 +22,10 @@ package org.apache.flink.types.parser;
 import org.apache.flink.types.parser.ByteParser;
 import org.apache.flink.types.parser.FieldParser;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 public class ByteParserTest extends ParserTestBase<Byte> {
 
@@ -43,7 +47,7 @@ public class ByteParserTest extends ParserTestBase<Byte> {
        public String[] getInvalidTestValues() {
                return new String[] {
                        "a", "9a", "-57-6", "7-88", 
String.valueOf(Byte.MAX_VALUE) + "0", String.valueOf(Short.MIN_VALUE),
-                       String.valueOf(Byte.MAX_VALUE + 1), 
String.valueOf(Byte.MIN_VALUE - 1)
+                       String.valueOf(Byte.MAX_VALUE + 1), 
String.valueOf(Byte.MIN_VALUE - 1),  " 1", "2 ", " ", "\t"
                };
        }
 
@@ -56,4 +60,5 @@ public class ByteParserTest extends ParserTestBase<Byte> {
        public Class<Byte> getTypeClass() {
                return Byte.class;
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
index a6c315a..1df3429 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
@@ -45,7 +45,7 @@ public class ByteValueParserTest extends 
ParserTestBase<ByteValue> {
        public String[] getInvalidTestValues() {
                return new String[] {
                        "a", "9a", "-57-6", "7-88", 
String.valueOf(Byte.MAX_VALUE) + "0", String.valueOf(Short.MIN_VALUE),
-                       String.valueOf(Byte.MAX_VALUE + 1), 
String.valueOf(Byte.MIN_VALUE - 1)
+                       String.valueOf(Byte.MAX_VALUE + 1), 
String.valueOf(Byte.MIN_VALUE - 1),  " 1", "2 ", " ", "\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
index 71e78a0..c68dd43 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
@@ -50,7 +50,7 @@ public class DoubleParserTest extends ParserTestBase<Double> {
        @Override
        public String[] getInvalidTestValues() {
                return new String[] {
-                       "a", "123abc4", "-57-6", "7-877678"
+                       "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", 
"\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
index 120dfac..7908180 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
@@ -52,7 +52,7 @@ public class DoubleValueParserTest extends 
ParserTestBase<DoubleValue> {
        @Override
        public String[] getInvalidTestValues() {
                return new String[] {
-                       "a", "123abc4", "-57-6", "7-877678"
+                       "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", 
"\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
index 3c450a5..012e353 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
@@ -50,7 +50,7 @@ public class FloatParserTest extends ParserTestBase<Float> {
        @Override
        public String[] getInvalidTestValues() {
                return new String[] {
-                       "a", "123abc4", "-57-6", "7-877678"
+                       "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", 
"\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
index be5b5b8..2b85de0 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
@@ -52,7 +52,7 @@ public class FloatValueParserTest extends 
ParserTestBase<FloatValue> {
        @Override
        public String[] getInvalidTestValues() {
                return new String[] {
-                       "a", "123abc4", "-57-6", "7-877678"
+                       "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", 
"\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
index 6e1d4db..0f11fbd 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
@@ -43,7 +43,7 @@ public class IntParserTest extends ParserTestBase<Integer> {
        public String[] getInvalidTestValues() {
                return new String[] {
                        "a", "1569a86", "-57-6", "7-877678", 
String.valueOf(Integer.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE),
-                       String.valueOf(((long) Integer.MAX_VALUE) + 1), 
String.valueOf(((long) Integer.MIN_VALUE) - 1)
+                       String.valueOf(((long) Integer.MAX_VALUE) + 1), 
String.valueOf(((long) Integer.MIN_VALUE) - 1), " 1", "2 ", " ", "\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
index e32f704..2b6d72e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
@@ -45,7 +45,8 @@ public class IntValueParserTest extends 
ParserTestBase<IntValue> {
        public String[] getInvalidTestValues() {
                return new String[] {
                        "a", "1569a86", "-57-6", "7-877678", 
String.valueOf(Integer.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE),
-                       String.valueOf(((long) Integer.MAX_VALUE) + 1), 
String.valueOf(((long) Integer.MIN_VALUE) - 1)
+                       String.valueOf(((long) Integer.MAX_VALUE) + 1), 
String.valueOf(((long) Integer.MIN_VALUE) - 1),
+                       " 1", "2 ", " ", "\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
index 4dd116b..2f7ac8f 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
@@ -45,7 +45,7 @@ public class LongParserTest extends ParserTestBase<Long> {
        public String[] getInvalidTestValues() {
                return new String[] {
                        "a", "1569a86", "-57-6", "7-877678", 
String.valueOf(Long.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE) + "0",
-                       "9223372036854775808", "-9223372036854775809"
+                       "9223372036854775808", "-9223372036854775809", " 1", "2 
", " ", "\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
index fac6f42..2000907 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
@@ -47,7 +47,7 @@ public class LongValueParserTest extends 
ParserTestBase<LongValue> {
        public String[] getInvalidTestValues() {
                return new String[] {
                        "a", "1569a86", "-57-6", "7-877678", 
String.valueOf(Long.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE) + "0",
-                       "9223372036854775808", "-9223372036854775809"
+                       "9223372036854775808", "-9223372036854775809", " 1", "2 
", " ", "\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index fb56add..dabac6f 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -25,7 +25,9 @@ import static org.junit.Assert.fail;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Arrays;
 
+import org.apache.flink.types.StringValue;
 import org.apache.flink.types.parser.FieldParser;
 import org.junit.Test;
 
@@ -45,7 +47,6 @@ public abstract class ParserTestBase<T> {
        
        public abstract Class<T> getTypeClass();
        
-
        @Test
        public void testTest() {
                assertNotNull(getParser());
@@ -243,7 +244,7 @@ public abstract class ParserTestBase<T> {
                                FieldParser<T> parser = getParser();
                                
                                byte[] bytes = testValues[i].getBytes();
-                               int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[] {'|'}, parser.createValue());
+                               int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
                                
                                assertTrue("Parser accepted the invalid value " 
+ testValues[i] + ".", numRead == -1);
                        }
@@ -402,4 +403,39 @@ public abstract class ParserTestBase<T> {
                return result;
        }
 
+       @Test
+       public void testEmptyFieldInIsolation() {
+               try {
+                       String [] emptyStrings = new String[] {"|"};
+
+                       FieldParser<T> parser = getParser();
+
+                       for (String emptyString : emptyStrings) {
+                               byte[] bytes = emptyString.getBytes();
+                               int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+
+                               if (getTypeClass() == String.class) {
+                                       assertTrue("Parser declared the empty 
string as invalid.", numRead != -1);
+                                       assertEquals("Invalid number of bytes 
read returned.", bytes.length, numRead);
+
+                                       T result = parser.getLastResult();
+                                       assertEquals("Parser parsed wrong.", 
"", result);
+                               } else if(getTypeClass() == StringValue.class) {
+                                       assertTrue("Parser declared the empty 
string as invalid.", numRead != -1);
+                                       assertEquals("Invalid number of bytes 
read returned.", bytes.length, numRead);
+
+                                       T result = parser.getLastResult();
+                                       assertEquals("Parser parsed wrong.", 
new StringValue(""), result);
+                               } else {
+                                       assertTrue("Parser accepted the empty 
string.", numRead == -1);
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test erroneous: " + e.getMessage());
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
index 3f4cd02..baea30f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
@@ -43,7 +43,7 @@ public class ShortParserTest extends ParserTestBase<Short> {
        public String[] getInvalidTestValues() {
                return new String[] {
                        "a", "1569a86", "-57-6", "7-877678", 
String.valueOf(Short.MAX_VALUE) + "0", String.valueOf(Integer.MIN_VALUE),
-                       String.valueOf(Short.MAX_VALUE + 1), 
String.valueOf(Short.MIN_VALUE - 1)
+                       String.valueOf(Short.MAX_VALUE + 1), 
String.valueOf(Short.MIN_VALUE - 1), " 1", "2 ", " ", "\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
index 44f1589..c56df83 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
@@ -46,7 +46,7 @@ public class ShortValueParserTest extends 
ParserTestBase<ShortValue> {
        public String[] getInvalidTestValues() {
                return new String[] {
                        "a", "1569a86", "-57-6", "7-877678", 
String.valueOf(Short.MAX_VALUE) + "0", String.valueOf(Integer.MIN_VALUE),
-                       String.valueOf(Short.MAX_VALUE + 1), 
String.valueOf(Short.MIN_VALUE - 1)
+                       String.valueOf(Short.MAX_VALUE + 1), 
String.valueOf(Short.MIN_VALUE - 1), " 1", "2 ", " ", "\t"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index bff3fec..3d87984 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.io;
 
 import com.google.common.base.Charsets;
 
+import org.apache.flink.api.common.io.ParseException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -320,24 +321,24 @@ public class CsvInputFormatTest {
                        fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
                }
        }
-       
+
        @Test
-       public void testIntegerFieldsl() throws IOException {
+       public void testIntegerFields() throws IOException {
                try {
                        final String fileContent = 
"111|222|333|444|555\n666|777|888|999|000|\n";
-                       final FileInputSplit split = 
createTempFile(fileContent);       
+                       final FileInputSplit split = 
createTempFile(fileContent);
 
                        final TupleTypeInfo<Tuple5<Integer, Integer, Integer, 
Integer, Integer>> typeInfo =
-                                       
TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, 
Integer.class, Integer.class, Integer.class);
+                               
TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, 
Integer.class, Integer.class, Integer.class);
                        final CsvInputFormat<Tuple5<Integer, Integer, Integer, 
Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, 
Integer, Integer, Integer>>(PATH, typeInfo);
-                       
+
                        format.setFieldDelimiter("|");
 
                        format.configure(new Configuration());
                        format.open(split);
-                       
+
                        Tuple5<Integer, Integer, Integer, Integer, Integer> 
result = new Tuple5<Integer, Integer, Integer, Integer, Integer>();
-                       
+
                        result = format.nextRecord(result);
                        assertNotNull(result);
                        assertEquals(Integer.valueOf(111), result.f0);
@@ -345,7 +346,7 @@ public class CsvInputFormatTest {
                        assertEquals(Integer.valueOf(333), result.f2);
                        assertEquals(Integer.valueOf(444), result.f3);
                        assertEquals(Integer.valueOf(555), result.f4);
-                       
+
                        result = format.nextRecord(result);
                        assertNotNull(result);
                        assertEquals(Integer.valueOf(666), result.f0);
@@ -353,6 +354,104 @@ public class CsvInputFormatTest {
                        assertEquals(Integer.valueOf(888), result.f2);
                        assertEquals(Integer.valueOf(999), result.f3);
                        assertEquals(Integer.valueOf(000), result.f4);
+
+                       result = format.nextRecord(result);
+                       assertNull(result);
+                       assertTrue(format.reachedEnd());
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
+               }
+       }
+
+       @Test
+       public void testEmptyFields() throws IOException {
+               try {
+                       final String fileContent = "|0|0|0|0|0|\n" +
+                               "1||1|1|1|1|\n" +
+                               "2|2||2|2|2|\n" +
+                               "3|3|3| |3|3|\n" +
+                               "4|4|4|4||4|\n" +
+                               "5|5|5|5|5||\n";
+                       final FileInputSplit split = 
createTempFile(fileContent);
+
+                       final TupleTypeInfo<Tuple6<Short, Integer, Long, Float, 
Double, Byte>> typeInfo =
+                               
TupleTypeInfo.getBasicTupleTypeInfo(Short.class, Integer.class, Long.class, 
Float.class, Double.class, Byte.class);
+                       final CsvInputFormat<Tuple6<Short, Integer, Long, 
Float, Double, Byte>> format = new CsvInputFormat<Tuple6<Short, Integer, Long, 
Float, Double, Byte>>(PATH, typeInfo);
+
+                       format.setFieldDelimiter("|");
+
+                       format.configure(new Configuration());
+                       format.open(split);
+
+                       Tuple6<Short, Integer, Long, Float, Double, Byte> 
result = new Tuple6<Short, Integer, Long, Float, Double, Byte>();
+
+                       try {
+                               result = format.nextRecord(result);
+                               fail("Empty String Parse Exception was not 
thrown! (ShortParser)");
+                       } catch (ParseException e) {}
+                       try {
+                               result = format.nextRecord(result);
+                               fail("Empty String Parse Exception was not 
thrown! (IntegerParser)");
+                       } catch (ParseException e) {}
+                       try {
+                               result = format.nextRecord(result);
+                               fail("Empty String Parse Exception was not 
thrown! (LongParser)");
+                       } catch (ParseException e) {}
+                       try {
+                               result = format.nextRecord(result);
+                               fail("Empty String Parse Exception was not 
thrown! (FloatParser)");
+                       } catch (ParseException e) {}
+                       try {
+                               result = format.nextRecord(result);
+                               fail("Empty String Parse Exception was not 
thrown! (DoubleParser)");
+                       } catch (ParseException e) {}
+                       try {
+                               result = format.nextRecord(result);
+                               fail("Empty String Parse Exception was not 
thrown! (ByteParser)");
+                       } catch (ParseException e) {}
+
+                       result = format.nextRecord(result);
+                       assertNull(result);
+                       assertTrue(format.reachedEnd());
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
+               }
+       }
+
+       @Test
+       public void testDoubleFields() throws IOException {
+               try {
+                       final String fileContent = 
"11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
+                       final FileInputSplit split = 
createTempFile(fileContent);       
+
+                       final TupleTypeInfo<Tuple5<Double, Double, Double, 
Double, Double>> typeInfo =
+                                       
TupleTypeInfo.getBasicTupleTypeInfo(Double.class, Double.class, Double.class, 
Double.class, Double.class);
+                       final CsvInputFormat<Tuple5<Double, Double, Double, 
Double, Double>> format = new CsvInputFormat<Tuple5<Double, Double, Double, 
Double, Double>>(PATH, typeInfo);
+                       
+                       format.setFieldDelimiter("|");
+
+                       format.configure(new Configuration());
+                       format.open(split);
+                       
+                       Tuple5<Double, Double, Double, Double, Double> result = 
new Tuple5<Double, Double, Double, Double, Double>();
+                       
+                       result = format.nextRecord(result);
+                       assertNotNull(result);
+                       assertEquals(Double.valueOf(11.1), result.f0);
+                       assertEquals(Double.valueOf(22.2), result.f1);
+                       assertEquals(Double.valueOf(33.3), result.f2);
+                       assertEquals(Double.valueOf(44.4), result.f3);
+                       assertEquals(Double.valueOf(55.5), result.f4);
+                       
+                       result = format.nextRecord(result);
+                       assertNotNull(result);
+                       assertEquals(Double.valueOf(66.6), result.f0);
+                       assertEquals(Double.valueOf(77.7), result.f1);
+                       assertEquals(Double.valueOf(88.8), result.f2);
+                       assertEquals(Double.valueOf(99.9), result.f3);
+                       assertEquals(Double.valueOf(00.0), result.f4);
                        
                        result = format.nextRecord(result);
                        assertNull(result);
@@ -367,7 +466,7 @@ public class CsvInputFormatTest {
        public void testReadFirstN() throws IOException {
                try {
                        final String fileContent = 
"111|222|333|444|555|\n666|777|888|999|000|\n";
-                       final FileInputSplit split = 
createTempFile(fileContent);       
+                       final FileInputSplit split = 
createTempFile(fileContent);
 
                        final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo 
= TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
                        final CsvInputFormat<Tuple2<Integer, Integer>> format = 
new CsvInputFormat<Tuple2<Integer, Integer>>(PATH, typeInfo);
@@ -490,8 +589,9 @@ public class CsvInputFormatTest {
                        
                        format.setFieldDelimiter("&&");
 
-                       format.setFields(new boolean[] { true, false, false, 
true, false, false, false, true }, new Class<?>[] { Integer.class,
-                                       Integer.class, Integer.class });
+                       format.setFields(new boolean[]{true, false, false, 
true, false, false, false, true}, new 
+                               Class<?>[]{Integer.class,
+                               Integer.class, Integer.class});
                        
                        format.configure(new Configuration());
                        format.open(split);
@@ -547,7 +647,7 @@ public class CsvInputFormatTest {
 
                Object[][] failures = {
                                {"\"string\" trailing", 
FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
-                               {"\"unterminated ",             
FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
+                               {"\"unterminated ", 
FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
                };
 
                for (Object[] failure : failures) {
@@ -809,7 +909,8 @@ public class CsvInputFormatTest {
                @SuppressWarnings("unchecked")
                TypeInformation<PojoItem> typeInfo = 
(TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
                CsvInputFormat<PojoItem> inputFormat = new 
CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
-               inputFormat.setFields(new boolean[]{true, false, true, false, 
true, true}, new Class[]{Integer.class, String.class, Double.class, 
String.class});
+               inputFormat.setFields(new boolean[]{true, false, true, false, 
true, true}, new Class[]{Integer.class, String
+                       .class, Double.class, String.class});
 
                inputFormat.configure(new Configuration());
                FileInputSplit[] splits = inputFormat.createInputSplits(1);

Reply via email to