Updated Branches: refs/heads/flume-1.5 2cecd8f74 -> eb99bb41e
FLUME-2182. Spooling Directory Source will not ingest data completely when a wide character appears at the edge of a buffer (Sven Meys via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/eb99bb41 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/eb99bb41 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/eb99bb41 Branch: refs/heads/flume-1.5 Commit: eb99bb41ea813ab9cdd3e9ec76d54bd6d3628a82 Parents: 2cecd8f Author: Mike Percy <[email protected]> Authored: Fri Sep 27 17:01:41 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Sep 27 17:16:27 2013 -0700 ---------------------------------------------------------------------- .../ResettableFileInputStream.java | 9 +++- .../TestResettableFileInputStream.java | 46 ++++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/eb99bb41/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java index 09f490f..ecea5e2 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java @@ -64,6 +64,7 @@ public class ResettableFileInputStream extends ResettableInputStream private final CharsetDecoder decoder; private long position; private long syncPosition; + private int maxCharWidth; /** * @@ -112,6 +113,7 @@ public class ResettableFileInputStream extends ResettableInputStream this.decoder = charset.newDecoder(); this.position = 0; this.syncPosition = 0; + this.maxCharWidth = (int)Math.ceil(charset.newEncoder().maxBytesPerChar()); seek(tracker.getPosition()); } @@ -152,7 +154,12 @@ public class ResettableFileInputStream extends ResettableInputStream @Override public synchronized int readChar() throws IOException { - if (!buf.hasRemaining()) { + // The decoder can have issues with multi-byte characters. + // This check ensures that there are at least maxCharWidth bytes in the buffer + // before reaching EOF. + if (buf.remaining() < maxCharWidth) { + buf.clear(); + buf.flip(); refillBuf(); } http://git-wip-us.apache.org/repos/asf/flume/blob/eb99bb41/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java index 5ad6a0a..066765c 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java @@ -18,6 +18,7 @@ package org.apache.flume.serialization; import com.google.common.base.Charsets; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.io.Files; import junit.framework.Assert; @@ -87,6 +88,27 @@ public class TestResettableFileInputStream { } /** + * Ensure that we can process lines that contain multi byte characters in weird places + * such as at the end of a buffer. + * @throws IOException + */ + @Test + public void testWideCharRead() throws IOException { + String output = wideCharFileInit(file, Charsets.UTF_8); + + PositionTracker tracker = new DurablePositionTracker(meta, file.getPath()); + ResettableInputStream in = new ResettableFileInputStream(file, tracker); + + String result = readLine(in, output.length()); + assertEquals(output, result); + + String afterEOF = readLine(in, output.length()); + assertNull(afterEOF); + + in.close(); + } + + /** * Ensure a reset() brings us back to the default mark (beginning of file) * @throws IOException */ @@ -230,6 +252,30 @@ public class TestResettableFileInputStream { } /** + * Helper method that generates a line to test if parts of multi-byte characters on the + * edge of a buffer are handled properly. + */ + private static String generateWideCharLine(){ + String s = "éllo Wörld!\n"; + int size = (ResettableFileInputStream.DEFAULT_BUF_SIZE - 1) + s.length(); + return Strings.padStart(s, size , 'H'); + } + + /** + * Creates a file that contains a line that contains wide characters + * @param file + * @param charset + * @return + * @throws IOException + */ + private static String wideCharFileInit(File file, Charset charset) + throws IOException { + String output = generateWideCharLine(); + Files.write(output.getBytes(charset), file); + return output; + } + + /** * Helper function to read a line from a character stream. * @param in * @param maxLength
