[FLINK-1820] Consistent behavior of numeric value parsers. This closes #566
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39d526e6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39d526e6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39d526e6 Branch: refs/heads/master Commit: 39d526e6f8b26ff35e1023c65293982285ffcc78 Parents: 6403dbd Author: FelixNeutatz <neut...@googlemail.com> Authored: Fri Apr 3 18:13:32 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue May 19 00:52:29 2015 +0200 ---------------------------------------------------------------------- .../apache/flink/types/parser/ByteParser.java | 68 +++++----- .../flink/types/parser/ByteValueParser.java | 4 + .../apache/flink/types/parser/DoubleParser.java | 69 +++++----- .../flink/types/parser/DoubleValueParser.java | 8 +- .../apache/flink/types/parser/FieldParser.java | 8 +- .../apache/flink/types/parser/FloatParser.java | 69 +++++----- .../flink/types/parser/FloatValueParser.java | 8 +- .../apache/flink/types/parser/IntParser.java | 69 +++++----- .../flink/types/parser/IntValueParser.java | 4 + .../apache/flink/types/parser/LongParser.java | 51 ++++---- .../flink/types/parser/LongValueParser.java | 4 + .../apache/flink/types/parser/ShortParser.java | 47 +++---- .../flink/types/parser/ShortValueParser.java | 4 + .../flink/types/parser/ByteParserTest.java | 7 +- .../flink/types/parser/ByteValueParserTest.java | 2 +- .../flink/types/parser/DoubleParserTest.java | 2 +- .../types/parser/DoubleValueParserTest.java | 2 +- .../flink/types/parser/FloatParserTest.java | 2 +- .../types/parser/FloatValueParserTest.java | 2 +- .../flink/types/parser/IntParserTest.java | 2 +- .../flink/types/parser/IntValueParserTest.java | 3 +- .../flink/types/parser/LongParserTest.java | 2 +- .../flink/types/parser/LongValueParserTest.java | 2 +- .../flink/types/parser/ParserTestBase.java | 40 +++++- .../flink/types/parser/ShortParserTest.java | 2 +- .../types/parser/ShortValueParserTest.java | 2 +- .../flink/api/java/io/CsvInputFormatTest.java | 127 +++++++++++++++++-- 27 files changed, 405 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java index 5858da2..09e517a 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java @@ -21,22 +21,23 @@ package org.apache.flink.types.parser; public class ByteParser extends FieldParser<Byte> { - + private byte result; - + @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Byte reusable) { int val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length+1; - + final int delimLimit = limit - delimiter.length + 1; + if (bytes[startPos] == '-') { neg = true; startPos++; - + // check for empty field with only the sign - if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { + if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, + delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } @@ -44,6 +45,10 @@ public class ByteParser extends FieldParser<Byte> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } this.result = (byte) (neg ? -val : val); return i + delimiter.length; } @@ -53,17 +58,17 @@ public class ByteParser extends FieldParser<Byte> { } val *= 10; val += bytes[i] - 48; - + if (val > Byte.MAX_VALUE && (!neg || val > -Byte.MIN_VALUE)) { setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW); return -1; } } - + this.result = (byte) (neg ? -val : val); return limit; } - + @Override public Byte createValue() { return Byte.MIN_VALUE; @@ -73,43 +78,40 @@ public class ByteParser extends FieldParser<Byte> { public Byte getLastResult() { return Byte.valueOf(this.result); } - + /** - * Static utility to parse a field of type byte from a byte sequence that represents text characters + * Static utility to parse a field of type byte from a byte sequence that represents text + * characters * (such as when read from a file stream). - * - * @param bytes The bytes containing the text data that should be parsed. + * + * @param bytes The bytes containing the text data that should be parsed. * @param startPos The offset to start the parsing. - * @param length The length of the byte sequence (counting from the offset). - * + * @param length The length of the byte sequence (counting from the offset). * @return The parsed value. - * - * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. */ public static final byte parseField(byte[] bytes, int startPos, int length) { return parseField(bytes, startPos, length, (char) 0xffff); } - + /** - * Static utility to parse a field of type byte from a byte sequence that represents text characters + * Static utility to parse a field of type byte from a byte sequence that represents text + * characters * (such as when read from a file stream). - * - * @param bytes The bytes containing the text data that should be parsed. - * @param startPos The offset to start the parsing. - * @param length The length of the byte sequence (counting from the offset). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). * @param delimiter The delimiter that terminates the field. - * * @return The parsed value. - * - * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. */ public static final byte parseField(byte[] bytes, int startPos, int length, char delimiter) { - if (length <= 0) { - throw new NumberFormatException("Invalid input: Empty string"); - } long val = 0; boolean neg = false; - + if (bytes[startPos] == '-') { neg = true; startPos++; @@ -118,17 +120,17 @@ public class ByteParser extends FieldParser<Byte> { throw new NumberFormatException("Orphaned minus sign."); } } - + for (; length > 0; startPos++, length--) { if (bytes[startPos] == delimiter) { - return (byte) (neg ? -val : val); + throw new NumberFormatException("Empty field."); } if (bytes[startPos] < 48 || bytes[startPos] > 57) { throw new NumberFormatException("Invalid character."); } val *= 10; val += bytes[startPos] - 48; - + if (val > Byte.MAX_VALUE && (!neg || val > -Byte.MIN_VALUE)) { throw new NumberFormatException("Value overflow/underflow"); } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java index f9b36e4..612a1cb 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java @@ -52,6 +52,10 @@ public class ByteValueParser extends FieldParser<ByteValue> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } reusable.setValue((byte) (neg ? -val : val)); return i + delimiter.length; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java index 947fdfe..086c1f5 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java @@ -23,35 +23,39 @@ package org.apache.flink.types.parser; * Parses a text field into a Double. */ public class DoubleParser extends FieldParser<Double> { - + private static final Double DOUBLE_INSTANCE = Double.valueOf(0.0); - + private double result; - + @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) { int i = startPos; - final int delimLimit = limit-delimiter.length+1; - + final int delimLimit = limit - delimiter.length + 1; + while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { break; } i++; } - - String str = new String(bytes, startPos, i-startPos); + + String str = new String(bytes, startPos, i - startPos); + int len = str.length(); + if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) { + setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD); + return -1; + } try { this.result = Double.parseDouble(str); return (i == limit) ? limit : i + delimiter.length; - } - catch (NumberFormatException e) { + } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); return -1; } } - + @Override public Double createValue() { return DOUBLE_INSTANCE; @@ -61,35 +65,35 @@ public class DoubleParser extends FieldParser<Double> { public Double getLastResult() { return Double.valueOf(this.result); } - + /** - * Static utility to parse a field of type double from a byte sequence that represents text characters + * Static utility to parse a field of type double from a byte sequence that represents text + * characters * (such as when read from a file stream). - * - * @param bytes The bytes containing the text data that should be parsed. + * + * @param bytes The bytes containing the text data that should be parsed. * @param startPos The offset to start the parsing. - * @param length The length of the byte sequence (counting from the offset). - * + * @param length The length of the byte sequence (counting from the offset). * @return The parsed value. - * - * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. */ public static final double parseField(byte[] bytes, int startPos, int length) { return parseField(bytes, startPos, length, (char) 0xffff); } - + /** - * Static utility to parse a field of type double from a byte sequence that represents text characters + * Static utility to parse a field of type double from a byte sequence that represents text + * characters * (such as when read from a file stream). - * - * @param bytes The bytes containing the text data that should be parsed. - * @param startPos The offset to start the parsing. - * @param length The length of the byte sequence (counting from the offset). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). * @param delimiter The delimiter that terminates the field. - * * @return The parsed value. - * - * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. */ public static final double parseField(byte[] bytes, int startPos, int length, char delimiter) { if (length <= 0) { @@ -97,12 +101,17 @@ public class DoubleParser extends FieldParser<Double> { } int i = 0; final byte delByte = (byte) delimiter; - + while (i < length && bytes[i] != delByte) { i++; } - - String str = new String(bytes, startPos, i); + + String str = new String(bytes, startPos, i - startPos); + int len = str.length(); + if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) { + throw new NumberFormatException("There is leading or trailing whitespace in the " + + "numeric field: " + str); + } return Double.parseDouble(str); } } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java index e225c1f..7751831 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java @@ -33,7 +33,7 @@ public class DoubleValueParser extends FieldParser<DoubleValue> { int i = startPos; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { @@ -42,7 +42,11 @@ public class DoubleValueParser extends FieldParser<DoubleValue> { i++; } - String str = new String(bytes, startPos, i-startPos); + String str = new String(bytes, startPos, i - startPos); + if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) { + setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD); + return -1; + } try { double value = Double.parseDouble(str); reusable.setValue(value); http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 33697fd..55e9915 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -64,7 +64,13 @@ public abstract class FieldParser<T> { UNTERMINATED_QUOTED_STRING, /** The parser found characters between the end of the quoted string and the delimiter. */ - UNQUOTED_CHARS_AFTER_QUOTED_STRING + UNQUOTED_CHARS_AFTER_QUOTED_STRING, + + /** The string is empty. */ + EMPTY_STRING, + + /** There is whitespace in a numeric field. */ + WHITESPACE_IN_NUMERIC_FIELD } private ParseErrorState errorState = ParseErrorState.NONE; http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java index 7d166c7..be98aa1 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java @@ -23,15 +23,16 @@ package org.apache.flink.types.parser; * Parses a text field into a {@link Float}. */ public class FloatParser extends FieldParser<Float> { - + private float result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float reusable) { - + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float + reusable) { + int i = startPos; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { @@ -39,18 +40,23 @@ public class FloatParser extends FieldParser<Float> { } i++; } - - String str = new String(bytes, startPos, i-startPos); + + if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) { + setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD); + return -1; + } + + String str = new String(bytes, startPos, i - startPos); + int len = str.length(); try { this.result = Float.parseFloat(str); - return (i == limit) ? limit : i+ delimiter.length; - } - catch (NumberFormatException e) { + return (i == limit) ? limit : i + delimiter.length; + } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); return -1; } } - + @Override public Float createValue() { return Float.MIN_VALUE; @@ -60,35 +66,35 @@ public class FloatParser extends FieldParser<Float> { public Float getLastResult() { return Float.valueOf(this.result); } - + /** - * Static utility to parse a field of type float from a byte sequence that represents text characters + * Static utility to parse a field of type float from a byte sequence that represents text + * characters * (such as when read from a file stream). - * - * @param bytes The bytes containing the text data that should be parsed. + * + * @param bytes The bytes containing the text data that should be parsed. * @param startPos The offset to start the parsing. - * @param length The length of the byte sequence (counting from the offset). - * + * @param length The length of the byte sequence (counting from the offset). * @return The parsed value. - * - * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. */ public static final float parseField(byte[] bytes, int startPos, int length) { return parseField(bytes, startPos, length, (char) 0xffff); } - + /** - * Static utility to parse a field of type float from a byte sequence that represents text characters + * Static utility to parse a field of type float from a byte sequence that represents text + * characters * (such as when read from a file stream). - * - * @param bytes The bytes containing the text data that should be parsed. - * @param startPos The offset to start the parsing. - * @param length The length of the byte sequence (counting from the offset). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). * @param delimiter The delimiter that terminates the field. - * * @return The parsed value. - * - * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. */ public static final float parseField(byte[] bytes, int startPos, int length, char delimiter) { if (length <= 0) { @@ -96,12 +102,17 @@ public class FloatParser extends FieldParser<Float> { } int i = 0; final byte delByte = (byte) delimiter; - + while (i < length && bytes[i] != delByte) { i++; } - String str = new String(bytes, startPos, i); + String str = new String(bytes, startPos, i - startPos); + if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) { + throw new NumberFormatException("There is leading or trailing whitespace in the " + + "numeric field: " + str); + } + int len = str.length(); return Float.parseFloat(str); } } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java index af16d4c..e8caac2 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java @@ -33,7 +33,7 @@ public class FloatValueParser extends FieldParser<FloatValue> { int i = startPos; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { @@ -42,7 +42,11 @@ public class FloatValueParser extends FieldParser<FloatValue> { i++; } - String str = new String(bytes, startPos, i-startPos); + String str = new String(bytes, startPos, i - startPos); + if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) { + setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD); + return -1; + } try { float value = Float.parseFloat(str); reusable.setValue(value); http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java index c871f4a..dcd2ec2 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java @@ -25,32 +25,38 @@ package org.apache.flink.types.parser; * The parser does not check for the maximum value. */ public class IntParser extends FieldParser<Integer> { - + private static final long OVERFLOW_BOUND = 0x7fffffffL; private static final long UNDERFLOW_BOUND = 0x80000000L; private int result; - + @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer + reusable) { long val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; if (bytes[startPos] == '-') { neg = true; startPos++; - + // check for empty field with only the sign - if (startPos == limit || ( startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { + if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, + delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } - + for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } this.result = (int) (neg ? -val : val); return i + delimiter.length; } @@ -60,17 +66,17 @@ public class IntParser extends FieldParser<Integer> { } val *= 10; val += bytes[i] - 48; - + if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) { setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW); return -1; } } - + this.result = (int) (neg ? -val : val); return limit; } - + @Override public Integer createValue() { return Integer.MIN_VALUE; @@ -80,40 +86,37 @@ public class IntParser extends FieldParser<Integer> { public Integer getLastResult() { return Integer.valueOf(this.result); } - + /** - * Static utility to parse a field of type int from a byte sequence that represents text characters + * Static utility to parse a field of type int from a byte sequence that represents text + * characters * (such as when read from a file stream). - * - * @param bytes The bytes containing the text data that should be parsed. + * + * @param bytes The bytes containing the text data that should be parsed. * @param startPos The offset to start the parsing. - * @param length The length of the byte sequence (counting from the offset). - * + * @param length The length of the byte sequence (counting from the offset). * @return The parsed value. - * - * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. */ public static final int parseField(byte[] bytes, int startPos, int length) { return parseField(bytes, startPos, length, (char) 0xffff); } - + /** - * Static utility to parse a field of type int from a byte sequence that represents text characters + * Static utility to parse a field of type int from a byte sequence that represents text + * characters * (such as when read from a file stream). - * - * @param bytes The bytes containing the text data that should be parsed. - * @param startPos The offset to start the parsing. - * @param length The length of the byte sequence (counting from the offset). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). * @param delimiter The delimiter that terminates the field. - * * @return The parsed value. - * - * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. */ public static final int parseField(byte[] bytes, int startPos, int length, char delimiter) { - if (length <= 0) { - throw new NumberFormatException("Invalid input: Empty string"); - } long val = 0; boolean neg = false; @@ -125,17 +128,17 @@ public class IntParser extends FieldParser<Integer> { throw new NumberFormatException("Orphaned minus sign."); } } - + for (; length > 0; startPos++, length--) { if (bytes[startPos] == delimiter) { - return (int) (neg ? -val : val); + throw new NumberFormatException("Empty field."); } if (bytes[startPos] < 48 || bytes[startPos] > 57) { throw new NumberFormatException("Invalid character."); } val *= 10; val += bytes[startPos] - 48; - + if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) { throw new NumberFormatException("Value overflow/underflow"); } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java index 8cb8176..abd8615 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java @@ -54,6 +54,10 @@ public class IntValueParser extends FieldParser<IntValue> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } reusable.setValue((int) (neg ? -val : val)); return i + delimiter.length; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java index af17f15..bb6c7c9 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java @@ -24,9 +24,9 @@ package org.apache.flink.types.parser; * Only characters '1' to '0' and '-' are allowed. */ public class LongParser extends FieldParser<Long> { - + private long result; - + @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Long reusable) { long val = 0; @@ -37,16 +37,20 @@ public class LongParser extends FieldParser<Long> { if (bytes[startPos] == '-') { neg = true; startPos++; - + // check for empty field with only the sign if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } - + for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } this.result = neg ? -val : val; return i + delimiter.length; } @@ -56,15 +60,15 @@ public class LongParser extends FieldParser<Long> { } val *= 10; val += bytes[i] - 48; - + // check for overflow / underflow if (val < 0) { // this is an overflow/underflow, unless we hit exactly the Long.MIN_VALUE if (neg && val == Long.MIN_VALUE) { this.result = Long.MIN_VALUE; - + if (i+1 >= limit) { - return limit; + return limit; } else if (i+1 < delimLimit && delimiterNext(bytes, i+1, delimiter)) { return i + 1 + delimiter.length; } else { @@ -78,57 +82,54 @@ public class LongParser extends FieldParser<Long> { } } } - + this.result = neg ? -val : val; return limit; } - + @Override public Long createValue() { return Long.MIN_VALUE; } - + @Override public Long getLastResult() { return Long.valueOf(this.result); } - + /** * Static utility to parse a field of type long from a byte sequence that represents text characters * (such as when read from a file stream). - * + * * @param bytes The bytes containing the text data that should be parsed. * @param startPos The offset to start the parsing. * @param length The length of the byte sequence (counting from the offset). - * + * * @return The parsed value. - * + * * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. */ public static final long parseField(byte[] bytes, int startPos, int length) { return parseField(bytes, startPos, length, (char) 0xffff); } - + /** * Static utility to parse a field of type long from a byte sequence that represents text characters * (such as when read from a file stream). - * + * * @param bytes The bytes containing the text data that should be parsed. * @param startPos The offset to start the parsing. * @param length The length of the byte sequence (counting from the offset). * @param delimiter The delimiter that terminates the field. - * + * * @return The parsed value. - * + * * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. */ public static final long parseField(byte[] bytes, int startPos, int length, char delimiter) { - if (length <= 0) { - throw new NumberFormatException("Invalid input: Empty string"); - } long val = 0; boolean neg = false; - + if (bytes[startPos] == '-') { neg = true; startPos++; @@ -137,17 +138,17 @@ public class LongParser extends FieldParser<Long> { throw new NumberFormatException("Orphaned minus sign."); } } - + for (; length > 0; startPos++, length--) { if (bytes[startPos] == delimiter) { - return neg ? -val : val; + throw new NumberFormatException("Empty field."); } if (bytes[startPos] < 48 || bytes[startPos] > 57) { throw new NumberFormatException("Invalid character."); } val *= 10; val += bytes[startPos] - 48; - + // check for overflow / underflow if (val < 0) { // this is an overflow/underflow, unless we hit exactly the Long.MIN_VALUE http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java index 8b697cc..a99a86e 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java @@ -51,6 +51,10 @@ public class LongValueParser extends FieldParser<LongValue> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } reusable.setValue(neg ? -val : val); return i + delimiter.length; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java index a6f9898..6e04d60 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java @@ -25,10 +25,10 @@ package org.apache.flink.types.parser; * The parser does not check for the maximum value. */ public class ShortParser extends FieldParser<Short> { - + private static final int OVERFLOW_BOUND = 0x7fff; private static final int UNDERFLOW_BOUND = 0x8000; - + private short result; @Override @@ -37,20 +37,24 @@ public class ShortParser extends FieldParser<Short> { boolean neg = false; final int delimLimit = limit-delimiter.length+1; - + if (bytes[startPos] == '-') { neg = true; startPos++; - + // check for empty field with only the sign if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } - + for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } this.result = (short) (neg ? -val : val); return i + delimiter.length; } @@ -60,17 +64,17 @@ public class ShortParser extends FieldParser<Short> { } val *= 10; val += bytes[i] - 48; - + if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) { setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW); return -1; } } - + this.result = (short) (neg ? -val : val); return limit; } - + @Override public Short createValue() { return Short.MIN_VALUE; @@ -80,43 +84,40 @@ public class ShortParser extends FieldParser<Short> { public Short getLastResult() { return Short.valueOf(this.result); } - + /** * Static utility to parse a field of type short from a byte sequence that represents text characters * (such as when read from a file stream). - * + * * @param bytes The bytes containing the text data that should be parsed. * @param startPos The offset to start the parsing. * @param length The length of the byte sequence (counting from the offset). - * + * * @return The parsed value. - * + * * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. */ public static final short parseField(byte[] bytes, int startPos, int length) { return parseField(bytes, startPos, length, (char) 0xffff); } - + /** * Static utility to parse a field of type short from a byte sequence that represents text characters * (such as when read from a file stream). - * + * * @param bytes The bytes containing the text data that should be parsed. * @param startPos The offset to start the parsing. * @param length The length of the byte sequence (counting from the offset). * @param delimiter The delimiter that terminates the field. - * + * * @return The parsed value. - * + * * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number. */ public static final short parseField(byte[] bytes, int startPos, int length, char delimiter) { - if (length <= 0) { - throw new NumberFormatException("Invalid input: Empty string"); - } long val = 0; boolean neg = false; - + if (bytes[startPos] == '-') { neg = true; startPos++; @@ -125,17 +126,17 @@ public class ShortParser extends FieldParser<Short> { throw new NumberFormatException("Orphaned minus sign."); } } - + for (; length > 0; startPos++, length--) { if (bytes[startPos] == delimiter) { - return (short) (neg ? -val : val); + throw new NumberFormatException("Empty field."); } if (bytes[startPos] < 48 || bytes[startPos] > 57) { throw new NumberFormatException("Invalid character."); } val *= 10; val += bytes[startPos] - 48; - + if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) { throw new NumberFormatException("Value overflow/underflow"); } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java index f5168cc..4289d1a 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java @@ -54,6 +54,10 @@ public class ShortValueParser extends FieldParser<ShortValue> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } reusable.setValue((short) (neg ? -val : val)); return i + delimiter.length; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java index 37d6903..ac49783 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java @@ -22,6 +22,10 @@ package org.apache.flink.types.parser; import org.apache.flink.types.parser.ByteParser; import org.apache.flink.types.parser.FieldParser; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class ByteParserTest extends ParserTestBase<Byte> { @@ -43,7 +47,7 @@ public class ByteParserTest extends ParserTestBase<Byte> { public String[] getInvalidTestValues() { return new String[] { "a", "9a", "-57-6", "7-88", String.valueOf(Byte.MAX_VALUE) + "0", String.valueOf(Short.MIN_VALUE), - String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1) + String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1), " 1", "2 ", " ", "\t" }; } @@ -56,4 +60,5 @@ public class ByteParserTest extends ParserTestBase<Byte> { public Class<Byte> getTypeClass() { return Byte.class; } + } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java index a6c315a..1df3429 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java @@ -45,7 +45,7 @@ public class ByteValueParserTest extends ParserTestBase<ByteValue> { public String[] getInvalidTestValues() { return new String[] { "a", "9a", "-57-6", "7-88", String.valueOf(Byte.MAX_VALUE) + "0", String.valueOf(Short.MIN_VALUE), - String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1) + String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1), " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java index 71e78a0..c68dd43 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java @@ -50,7 +50,7 @@ public class DoubleParserTest extends ParserTestBase<Double> { @Override public String[] getInvalidTestValues() { return new String[] { - "a", "123abc4", "-57-6", "7-877678" + "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java index 120dfac..7908180 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java @@ -52,7 +52,7 @@ public class DoubleValueParserTest extends ParserTestBase<DoubleValue> { @Override public String[] getInvalidTestValues() { return new String[] { - "a", "123abc4", "-57-6", "7-877678" + "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java index 3c450a5..012e353 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java @@ -50,7 +50,7 @@ public class FloatParserTest extends ParserTestBase<Float> { @Override public String[] getInvalidTestValues() { return new String[] { - "a", "123abc4", "-57-6", "7-877678" + "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java index be5b5b8..2b85de0 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java @@ -52,7 +52,7 @@ public class FloatValueParserTest extends ParserTestBase<FloatValue> { @Override public String[] getInvalidTestValues() { return new String[] { - "a", "123abc4", "-57-6", "7-877678" + "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java index 6e1d4db..0f11fbd 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java @@ -43,7 +43,7 @@ public class IntParserTest extends ParserTestBase<Integer> { public String[] getInvalidTestValues() { return new String[] { "a", "1569a86", "-57-6", "7-877678", String.valueOf(Integer.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE), - String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1) + String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1), " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java index e32f704..2b6d72e 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java @@ -45,7 +45,8 @@ public class IntValueParserTest extends ParserTestBase<IntValue> { public String[] getInvalidTestValues() { return new String[] { "a", "1569a86", "-57-6", "7-877678", String.valueOf(Integer.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE), - String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1) + String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1), + " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java index 4dd116b..2f7ac8f 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java @@ -45,7 +45,7 @@ public class LongParserTest extends ParserTestBase<Long> { public String[] getInvalidTestValues() { return new String[] { "a", "1569a86", "-57-6", "7-877678", String.valueOf(Long.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE) + "0", - "9223372036854775808", "-9223372036854775809" + "9223372036854775808", "-9223372036854775809", " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java index fac6f42..2000907 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java @@ -47,7 +47,7 @@ public class LongValueParserTest extends ParserTestBase<LongValue> { public String[] getInvalidTestValues() { return new String[] { "a", "1569a86", "-57-6", "7-877678", String.valueOf(Long.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE) + "0", - "9223372036854775808", "-9223372036854775809" + "9223372036854775808", "-9223372036854775809", " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java index fb56add..dabac6f 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java @@ -25,7 +25,9 @@ import static org.junit.Assert.fail; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Arrays; +import org.apache.flink.types.StringValue; import org.apache.flink.types.parser.FieldParser; import org.junit.Test; @@ -45,7 +47,6 @@ public abstract class ParserTestBase<T> { public abstract Class<T> getTypeClass(); - @Test public void testTest() { assertNotNull(getParser()); @@ -243,7 +244,7 @@ public abstract class ParserTestBase<T> { FieldParser<T> parser = getParser(); byte[] bytes = testValues[i].getBytes(); - int numRead = parser.parseField(bytes, 0, bytes.length, new byte[] {'|'}, parser.createValue()); + int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); assertTrue("Parser accepted the invalid value " + testValues[i] + ".", numRead == -1); } @@ -402,4 +403,39 @@ public abstract class ParserTestBase<T> { return result; } + @Test + public void testEmptyFieldInIsolation() { + try { + String [] emptyStrings = new String[] {"|"}; + + FieldParser<T> parser = getParser(); + + for (String emptyString : emptyStrings) { + byte[] bytes = emptyString.getBytes(); + int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + + if (getTypeClass() == String.class) { + assertTrue("Parser declared the empty string as invalid.", numRead != -1); + assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); + + T result = parser.getLastResult(); + assertEquals("Parser parsed wrong.", "", result); + } else if(getTypeClass() == StringValue.class) { + assertTrue("Parser declared the empty string as invalid.", numRead != -1); + assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); + + T result = parser.getLastResult(); + assertEquals("Parser parsed wrong.", new StringValue(""), result); + } else { + assertTrue("Parser accepted the empty string.", numRead == -1); + } + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test erroneous: " + e.getMessage()); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java index 3f4cd02..baea30f 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java @@ -43,7 +43,7 @@ public class ShortParserTest extends ParserTestBase<Short> { public String[] getInvalidTestValues() { return new String[] { "a", "1569a86", "-57-6", "7-877678", String.valueOf(Short.MAX_VALUE) + "0", String.valueOf(Integer.MIN_VALUE), - String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1) + String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1), " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java index 44f1589..c56df83 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java @@ -46,7 +46,7 @@ public class ShortValueParserTest extends ParserTestBase<ShortValue> { public String[] getInvalidTestValues() { return new String[] { "a", "1569a86", "-57-6", "7-877678", String.valueOf(Short.MAX_VALUE) + "0", String.valueOf(Integer.MIN_VALUE), - String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1) + String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1), " 1", "2 ", " ", "\t" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java index bff3fec..3d87984 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java @@ -21,6 +21,7 @@ package org.apache.flink.api.java.io; import com.google.common.base.Charsets; +import org.apache.flink.api.common.io.ParseException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.*; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -320,24 +321,24 @@ public class CsvInputFormatTest { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - + @Test - public void testIntegerFieldsl() throws IOException { + public void testIntegerFields() throws IOException { try { final String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"; - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo<Tuple5<Integer, Integer, Integer, Integer, Integer>> typeInfo = - TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class); + TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class); final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH, typeInfo); - + format.setFieldDelimiter("|"); format.configure(new Configuration()); format.open(split); - + Tuple5<Integer, Integer, Integer, Integer, Integer> result = new Tuple5<Integer, Integer, Integer, Integer, Integer>(); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Integer.valueOf(111), result.f0); @@ -345,7 +346,7 @@ public class CsvInputFormatTest { assertEquals(Integer.valueOf(333), result.f2); assertEquals(Integer.valueOf(444), result.f3); assertEquals(Integer.valueOf(555), result.f4); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Integer.valueOf(666), result.f0); @@ -353,6 +354,104 @@ public class CsvInputFormatTest { assertEquals(Integer.valueOf(888), result.f2); assertEquals(Integer.valueOf(999), result.f3); assertEquals(Integer.valueOf(000), result.f4); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); + } + } + + @Test + public void testEmptyFields() throws IOException { + try { + final String fileContent = "|0|0|0|0|0|\n" + + "1||1|1|1|1|\n" + + "2|2||2|2|2|\n" + + "3|3|3| |3|3|\n" + + "4|4|4|4||4|\n" + + "5|5|5|5|5||\n"; + final FileInputSplit split = createTempFile(fileContent); + + final TupleTypeInfo<Tuple6<Short, Integer, Long, Float, Double, Byte>> typeInfo = + TupleTypeInfo.getBasicTupleTypeInfo(Short.class, Integer.class, Long.class, Float.class, Double.class, Byte.class); + final CsvInputFormat<Tuple6<Short, Integer, Long, Float, Double, Byte>> format = new CsvInputFormat<Tuple6<Short, Integer, Long, Float, Double, Byte>>(PATH, typeInfo); + + format.setFieldDelimiter("|"); + + format.configure(new Configuration()); + format.open(split); + + Tuple6<Short, Integer, Long, Float, Double, Byte> result = new Tuple6<Short, Integer, Long, Float, Double, Byte>(); + + try { + result = format.nextRecord(result); + fail("Empty String Parse Exception was not thrown! (ShortParser)"); + } catch (ParseException e) {} + try { + result = format.nextRecord(result); + fail("Empty String Parse Exception was not thrown! (IntegerParser)"); + } catch (ParseException e) {} + try { + result = format.nextRecord(result); + fail("Empty String Parse Exception was not thrown! (LongParser)"); + } catch (ParseException e) {} + try { + result = format.nextRecord(result); + fail("Empty String Parse Exception was not thrown! (FloatParser)"); + } catch (ParseException e) {} + try { + result = format.nextRecord(result); + fail("Empty String Parse Exception was not thrown! (DoubleParser)"); + } catch (ParseException e) {} + try { + result = format.nextRecord(result); + fail("Empty String Parse Exception was not thrown! (ByteParser)"); + } catch (ParseException e) {} + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); + } + } + + @Test + public void testDoubleFields() throws IOException { + try { + final String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n"; + final FileInputSplit split = createTempFile(fileContent); + + final TupleTypeInfo<Tuple5<Double, Double, Double, Double, Double>> typeInfo = + TupleTypeInfo.getBasicTupleTypeInfo(Double.class, Double.class, Double.class, Double.class, Double.class); + final CsvInputFormat<Tuple5<Double, Double, Double, Double, Double>> format = new CsvInputFormat<Tuple5<Double, Double, Double, Double, Double>>(PATH, typeInfo); + + format.setFieldDelimiter("|"); + + format.configure(new Configuration()); + format.open(split); + + Tuple5<Double, Double, Double, Double, Double> result = new Tuple5<Double, Double, Double, Double, Double>(); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(Double.valueOf(11.1), result.f0); + assertEquals(Double.valueOf(22.2), result.f1); + assertEquals(Double.valueOf(33.3), result.f2); + assertEquals(Double.valueOf(44.4), result.f3); + assertEquals(Double.valueOf(55.5), result.f4); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(Double.valueOf(66.6), result.f0); + assertEquals(Double.valueOf(77.7), result.f1); + assertEquals(Double.valueOf(88.8), result.f2); + assertEquals(Double.valueOf(99.9), result.f3); + assertEquals(Double.valueOf(00.0), result.f4); result = format.nextRecord(result); assertNull(result); @@ -367,7 +466,7 @@ public class CsvInputFormatTest { public void testReadFirstN() throws IOException { try { final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n"; - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class); final CsvInputFormat<Tuple2<Integer, Integer>> format = new CsvInputFormat<Tuple2<Integer, Integer>>(PATH, typeInfo); @@ -490,8 +589,9 @@ public class CsvInputFormatTest { format.setFieldDelimiter("&&"); - format.setFields(new boolean[] { true, false, false, true, false, false, false, true }, new Class<?>[] { Integer.class, - Integer.class, Integer.class }); + format.setFields(new boolean[]{true, false, false, true, false, false, false, true}, new + Class<?>[]{Integer.class, + Integer.class, Integer.class}); format.configure(new Configuration()); format.open(split); @@ -547,7 +647,7 @@ public class CsvInputFormatTest { Object[][] failures = { {"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING}, - {"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING} + {"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING} }; for (Object[] failure : failures) { @@ -809,7 +909,8 @@ public class CsvInputFormatTest { @SuppressWarnings("unchecked") TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class); CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo); - inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String.class, Double.class, String.class}); + inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String + .class, Double.class, String.class}); inputFormat.configure(new Configuration()); FileInputSplit[] splits = inputFormat.createInputSplits(1);