Repository: flink
Updated Branches:
  refs/heads/release-1.4 e7395efec -> 1c7a7fa70


[FLINK-8331][core] FieldParser do not correctly set EMPT_COLUMN error state.

This closes #5218


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c7a7fa7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c7a7fa7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c7a7fa7

Branch: refs/heads/release-1.4
Commit: 1c7a7fa706e9b373bbe395398507378a61759190
Parents: e7395ef
Author: 金竹 <jincheng.su...@alibaba-inc.com>
Authored: Sat Dec 30 11:10:18 2017 +0800
Committer: sunjincheng121 <sunjincheng...@gmail.com>
Committed: Sat Jan 6 00:45:31 2018 +0800

----------------------------------------------------------------------
 .../flink/types/parser/BooleanParser.java       |  21 ++--
 .../apache/flink/types/parser/ByteParser.java   |   6 ++
 .../flink/types/parser/ByteValueParser.java     |   8 +-
 .../apache/flink/types/parser/FieldParser.java  |   8 +-
 .../apache/flink/types/parser/IntParser.java    |   9 +-
 .../flink/types/parser/IntValueParser.java      |   8 +-
 .../apache/flink/types/parser/LongParser.java   |   6 ++
 .../flink/types/parser/LongValueParser.java     |   6 ++
 .../apache/flink/types/parser/ShortParser.java  |   7 ++
 .../flink/types/parser/ShortValueParser.java    |   8 +-
 .../apache/flink/types/parser/StringParser.java |  10 +-
 .../flink/types/parser/StringValueParser.java   |  10 +-
 .../flink/types/parser/FieldParserTest.java     | 105 ++++++++++++++++++-
 .../flink/types/parser/ParserTestBase.java      |  27 ++---
 14 files changed, 194 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
index 908c05f..3a6178a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
@@ -37,34 +37,25 @@ public class BooleanParser extends FieldParser<Boolean> {
        };
 
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delim, Boolean reuse) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Boolean reuse) {
 
-               final int delimLimit = limit - delim.length + 1;
+               final int i = nextStringEndPos(bytes, startPos, limit, 
delimiter);
 
-               int i = startPos;
-
-               while (i < limit) {
-                       if (i < delimLimit && delimiterNext(bytes, i, delim)) {
-                               if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
-                                       return -1;
-                               }
-                               break;
-                       }
-                       i++;
+               if (i < 0) {
+                       return -1;
                }
 
                for (byte[] aTRUE : TRUE) {
                        if (byteArrayEquals(bytes, startPos, i - startPos, 
aTRUE)) {
                                result = true;
-                               return (i == limit) ? limit : i + delim.length;
+                               return (i == limit) ? limit : i + 
delimiter.length;
                        }
                }
 
                for (byte[] aFALSE : FALSE) {
                        if (byteArrayEquals(bytes, startPos, i - startPos, 
aFALSE)) {
                                result = false;
-                               return (i == limit) ? limit : i + delim.length;
+                               return (i == limit) ? limit : i + 
delimiter.length;
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
index 7ee257e..79e9f1f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
@@ -28,6 +28,12 @@ public class ByteParser extends FieldParser<Byte> {
 
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Byte reusable) {
+
+               if (startPos == limit) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       return -1;
+               }
+
                int val = 0;
                boolean neg = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
index c79f5d4..e5b8e47 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
@@ -33,6 +33,12 @@ public class ByteValueParser extends FieldParser<ByteValue> {
        
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, ByteValue reusable) {
+
+               if (startPos == limit) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       return -1;
+               }
+
                int val = 0;
                boolean neg = false;
                
@@ -73,7 +79,7 @@ public class ByteValueParser extends FieldParser<ByteValue> {
                                return -1;
                        }
                }
-               
+
                reusable.setValue((byte) (neg ? -val : val));
                return limit;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index c45f820..59cc2e7 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -210,15 +210,15 @@ public abstract class FieldParser<T> {
 
                while (endPos < limit) {
                        if (endPos < delimLimit && delimiterNext(bytes, endPos, 
delimiter)) {
-                               if (endPos == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
-                                       return -1;
-                               }
                                break;
                        }
                        endPos++;
                }
 
+               if (endPos == startPos) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       return -1;
+               }
                return endPos;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
index 4e5d43f..5135a99 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
@@ -35,8 +35,13 @@ public class IntParser extends FieldParser<Integer> {
        private int result;
 
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Integer 
-               reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Integer reusable) {
+
+               if (startPos == limit) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       return -1;
+               }
+
                long val = 0;
                boolean neg = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
index 0229bc7..0c15f90 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
@@ -36,6 +36,12 @@ public class IntValueParser extends FieldParser<IntValue> {
        
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, IntValue reusable) {
+
+               if (startPos == limit) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       return -1;
+               }
+
                long val = 0;
                boolean neg = false;
 
@@ -75,7 +81,7 @@ public class IntValueParser extends FieldParser<IntValue> {
                                return -1;
                        }
                }
-               
+
                reusable.setValue((int) (neg ? -val : val));
                return limit;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
index 79eb080..11ed1a4 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
@@ -32,6 +32,12 @@ public class LongParser extends FieldParser<Long> {
 
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Long reusable) {
+
+               if (startPos == limit) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       return -1;
+               }
+
                long val = 0;
                boolean neg = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
index 5ddd40c..91b8aab 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
@@ -33,6 +33,12 @@ public class LongValueParser extends FieldParser<LongValue> {
        
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, LongValue reusable) {
+
+               if (startPos == limit) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       return -1;
+               }
+
                long val = 0;
                boolean neg = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
index c458a3f..20f727d 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
@@ -36,6 +36,12 @@ public class ShortParser extends FieldParser<Short> {
 
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Short reusable) {
+
+               if (startPos == limit) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       return -1;
+               }
+
                int val = 0;
                boolean neg = false;
 
@@ -148,6 +154,7 @@ public class ShortParser extends FieldParser<Short> {
                                throw new NumberFormatException("Value 
overflow/underflow");
                        }
                }
+
                return (short) (neg ? -val : val);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
index 47471a3..0332ba8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
@@ -36,6 +36,12 @@ public class ShortValueParser extends 
FieldParser<ShortValue> {
 
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, ShortValue reusable) {
+
+               if (startPos == limit) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       return -1;
+               }
+
                int val = 0;
                boolean neg = false;
 
@@ -75,7 +81,7 @@ public class ShortValueParser extends FieldParser<ShortValue> 
{
                                return -1;
                        }
                }
-               
+
                reusable.setValue((short) (neg ? -val : val));
                return limit;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index 7b46a7e..bb1c04f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -42,6 +42,12 @@ public class StringParser extends FieldParser<String> {
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, String reusable) {
 
+               if (startPos == limit) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       this.result = "";
+                       return limit;
+               }
+
                int i = startPos;
 
                final int delimLimit = limit - delimiter.length + 1;
@@ -83,10 +89,6 @@ public class StringParser extends FieldParser<String> {
                        }
 
                        if (i >= delimLimit) {
-                               // no delimiter found. Take the full string
-                               if (limit == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
-                               }
                                this.result = new String(bytes, startPos, limit 
- startPos, getCharset());
                                return limit;
                        } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
index c72b029..193babb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
@@ -45,6 +45,12 @@ public class StringValueParser extends 
FieldParser<StringValue> {
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, StringValue reusable) {
 
+               if (startPos == limit) {
+                       setErrorState(ParseErrorState.EMPTY_COLUMN);
+                       reusable.setValueAscii(bytes, startPos, 0);
+                       return limit;
+               }
+
                this.result = reusable;
                int i = startPos;
 
@@ -89,10 +95,6 @@ public class StringValueParser extends 
FieldParser<StringValue> {
                        }
 
                        if (i >= delimLimit) {
-                               // no delimiter found. Take the full string
-                               if (limit == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
-                               }
                                reusable.setValueAscii(bytes, startPos, limit - 
startPos);
                                return limit;
                        } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
index bcb2bfb..ad46742 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.types.parser;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -43,4 +46,104 @@ public class FieldParserTest {
                assertFalse(FieldParser.endsWithDelimiter(bytes, 3, delim));
        }
 
-}
\ No newline at end of file
+       @Test
+       public void testNextStringEndPos() throws Exception {
+
+               FieldParser parser = new TestFieldParser<String>();
+               // single-char delimiter
+               byte[] singleCharDelim = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+               byte[] bytes1 = "a|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+               assertEquals(1, parser.nextStringEndPos(bytes1, 0, 
bytes1.length, singleCharDelim));
+               assertEquals(-1, parser.nextStringEndPos(bytes1, 1, 
bytes1.length, singleCharDelim));
+               assertEquals(ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+               parser.resetParserState();
+               assertEquals(-1, parser.nextStringEndPos(bytes1, 1, 1, 
singleCharDelim));
+               assertEquals(ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+               parser.resetParserState();
+               assertEquals(-1, parser.nextStringEndPos(bytes1, 2, 
bytes1.length, singleCharDelim));
+               assertEquals(ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+               byte[] bytes2 = "a||".getBytes(ConfigConstants.DEFAULT_CHARSET);
+               parser.resetParserState();
+               assertEquals(-1, parser.nextStringEndPos(bytes2, 1, 
bytes2.length, singleCharDelim));
+               assertEquals(ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+               byte[] bytes3 = "a|c".getBytes(ConfigConstants.DEFAULT_CHARSET);
+               parser.resetParserState();
+               assertEquals(-1, parser.nextStringEndPos(bytes3, 1, 
bytes3.length, singleCharDelim));
+               assertEquals(ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+               parser.resetParserState();
+               assertEquals(3, parser.nextStringEndPos(bytes3, 2, 
bytes3.length, singleCharDelim));
+               assertEquals(ParseErrorState.NONE, parser.getErrorState());
+
+               byte[] bytes4 = 
"a|c|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+               parser.resetParserState();
+               assertEquals(3, parser.nextStringEndPos(bytes4, 2, 
bytes4.length, singleCharDelim));
+               assertEquals(ParseErrorState.NONE, parser.getErrorState());
+
+               // multi-char delimiter
+               byte[] multiCharDelim = 
"|#|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+               byte[] mBytes1 = 
"a|#|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+               parser.resetParserState();
+               assertEquals(1, parser.nextStringEndPos(mBytes1, 0, 
mBytes1.length, multiCharDelim));
+               assertEquals(-1, parser.nextStringEndPos(mBytes1, 1, 
mBytes1.length, multiCharDelim));
+               assertEquals(ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+               parser.resetParserState();
+               assertEquals(-1, parser.nextStringEndPos(mBytes1, 1, 1, 
multiCharDelim));
+               assertEquals(ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+               byte[] mBytes2 = 
"a|#||#|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+               parser.resetParserState();
+               assertEquals(-1, parser.nextStringEndPos(mBytes2, 1, 
mBytes2.length, multiCharDelim));
+               assertEquals(ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+               byte[] mBytes3 = 
"a|#|b".getBytes(ConfigConstants.DEFAULT_CHARSET);
+               parser.resetParserState();
+               assertEquals(-1, parser.nextStringEndPos(mBytes3, 1, 
mBytes3.length, multiCharDelim));
+               assertEquals(ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+               parser.resetParserState();
+               assertEquals(5, parser.nextStringEndPos(mBytes3, 2, 
mBytes3.length, multiCharDelim));
+               assertEquals(ParseErrorState.NONE, parser.getErrorState());
+
+               byte[] mBytes4 = 
"a|#|b|#|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+               parser.resetParserState();
+               assertEquals(5, parser.nextStringEndPos(mBytes4, 2, 
mBytes4.length, multiCharDelim));
+               assertEquals(ParseErrorState.NONE, parser.getErrorState());
+
+       }
+
+}
+
+/**
+ * A FieldParser just for nextStringEndPos test.
+ *
+ * @param <T> The type that is parsed.
+ */
+class TestFieldParser<T> extends FieldParser<T>{
+
+       @Override
+       protected int parseField(byte[] bytes, int startPos, int limit, byte[] 
delim, T reuse) {
+               return 0;
+       }
+
+       @Override
+       public T getLastResult() {
+               return null;
+       }
+
+       @Override
+       public T createValue() {
+               return null;
+       }
+
+       @Override
+       protected void resetParserState() {
+               super.resetParserState();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7a7fa7/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index 51ace12..c0e01b1 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -405,28 +405,31 @@ public abstract class ParserTestBase<T> extends 
TestLogger {
        }
 
        @Test
-       public void testEmptyFieldInIsolation() {
+       public void testTrailingEmptyField() {
                try {
-                       String [] emptyStrings = new String[] {"|"};
-
                        FieldParser<T> parser = getParser();
 
-                       for (String emptyString : emptyStrings) {
-                               byte[] bytes = 
emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-                               int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+                       byte[] bytes = 
"||".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+                       for (int i = 0; i < 2; i++) {
+
+                               // test empty field with trailing delimiter 
when i = 0,
+                               // test empty field with out trailing delimiter 
when i= 1.
+                               int numRead = parser.parseField(bytes, i, 
bytes.length, new byte[]{'|'}, parser.createValue());
 
                                
assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-                               if(this.allowsEmptyField()) {
+                               if (this.allowsEmptyField()) {
                                        assertTrue("Parser declared the empty 
string as invalid.", numRead != -1);
-                                       assertEquals("Invalid number of bytes 
read returned.", bytes.length, numRead);
-                               }
-                               else {
+                                       assertEquals("Invalid number of bytes 
read returned.", i + 1, numRead);
+                               } else {
                                        assertTrue("Parser accepted the empty 
string.", numRead == -1);
                                }
+
+                               parser.resetParserState();
                        }
-               }
-               catch (Exception e) {
+
+               } catch (Exception e) {
                        System.err.println(e.getMessage());
                        e.printStackTrace();
                        fail("Test erroneous: " + e.getMessage());

Reply via email to