Repository: flume Updated Branches: refs/heads/trunk 88b3fee10 -> 0421fa2ab
FLUME-2801. Performance improvement on TailDir source (Jun Seok Hong via Satoshi Iijima and Roshan Naik) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0421fa2a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0421fa2a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0421fa2a Branch: refs/heads/trunk Commit: 0421fa2ab1eb9575b34bbb2f44e8c6d83842eaeb Parents: 88b3fee Author: Roshan Naik <[email protected]> Authored: Thu Dec 17 10:35:56 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Dec 17 10:39:49 2015 -0800 ---------------------------------------------------------------------- .../taildir/ReliableTaildirEventReader.java | 4 +- .../apache/flume/source/taildir/TailFile.java | 129 ++++++++++++++----- 2 files changed, 101 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/0421fa2a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index 951b786..5b6d465 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -195,7 +195,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader { } logger.info("Last read was never committed - resetting position"); long lastPos = currentFile.getPos(); - currentFile.getRaf().seek(lastPos); + currentFile.updateFilePos(lastPos); } List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset); if (events.isEmpty()) { @@ -223,7 +223,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader { @Override public void commit() throws IOException { if (!committed && currentFile != null) { - long pos = currentFile.getRaf().getFilePointer(); + long pos = currentFile.getLineReadPos(); currentFile.setPos(pos); currentFile.setLastUpdated(updateTime); committed = true; http://git-wip-us.apache.org/repos/asf/flume/blob/0421fa2a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java index 99683da..eabd357 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java @@ -28,22 +28,21 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import org.apache.commons.lang.StringUtils; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; import com.google.common.collect.Lists; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; public class TailFile { private static final Logger logger = LoggerFactory.getLogger(TailFile.class); - private static final String LINE_SEP = "\n"; - private static final String LINE_SEP_WIN = "\r\n"; + private static final byte BYTE_NL = (byte) 10; + private static final byte BYTE_CR = (byte) 13; + + private final static int BUFFER_SIZE = 8192; + private final static int NEED_READING = -1; private RandomAccessFile raf; private final String path; @@ -52,17 +51,26 @@ public class TailFile { private long lastUpdated; private boolean needTail; private final Map<String, String> headers; + private byte[] buffer; + private byte[] oldBuffer; + private int bufferPos; + private long lineReadPos; public TailFile(File file, Map<String, String> headers, long inode, long pos) throws IOException { this.raf = new RandomAccessFile(file, "r"); - if (pos > 0) raf.seek(pos); + if (pos > 0) { + raf.seek(pos); + lineReadPos=pos; + } this.path = file.getAbsolutePath(); this.inode = inode; this.pos = pos; this.lastUpdated = 0L; this.needTail = true; this.headers = headers; + this.oldBuffer = new byte[0]; + this.bufferPos= NEED_READING; } public RandomAccessFile getRaf() { return raf; } @@ -72,20 +80,29 @@ public class TailFile { public long getLastUpdated() { return lastUpdated; } public boolean needTail() { return needTail; } public Map<String, String> getHeaders() { return headers; } + public long getLineReadPos() { return lineReadPos; } public void setPos(long pos) { this.pos = pos; } public void setLastUpdated(long lastUpdated) { this.lastUpdated = lastUpdated; } public void setNeedTail(boolean needTail) { this.needTail = needTail; } + public void setLineReadPos(long lineReadPos) { this.lineReadPos = lineReadPos; } public boolean updatePos(String path, long inode, long pos) throws IOException { if (this.inode == inode && this.path.equals(path)) { - raf.seek(pos); setPos(pos); + updateFilePos(pos); logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos); return true; } return false; } + public void updateFilePos(long pos) throws IOException { + raf.seek(pos); + lineReadPos = pos; + bufferPos= NEED_READING; + oldBuffer = new byte[0]; + } + public List<Event> readEvents(int numEvents, boolean backoffWithoutNL, boolean addByteOffset) throws IOException { @@ -101,44 +118,87 @@ public class TailFile { } private Event readEvent(boolean backoffWithoutNL, boolean addByteOffset) throws IOException { - Long posTmp = raf.getFilePointer(); - String line = readLine(); + Long posTmp = getLineReadPos(); + LineResult line = readLine(); if (line == null) { return null; } - if (backoffWithoutNL && !line.endsWith(LINE_SEP)) { + if (backoffWithoutNL && !line.lineSepInclude) { logger.info("Backing off in file without newline: " + path + ", inode: " + inode + ", pos: " + raf.getFilePointer()); - raf.seek(posTmp); + updateFilePos(posTmp); return null; } - - String lineSep = LINE_SEP; - if(line.endsWith(LINE_SEP_WIN)) { - lineSep = LINE_SEP_WIN; - } - Event event = EventBuilder.withBody(StringUtils.removeEnd(line, lineSep), Charsets.UTF_8); + Event event = EventBuilder.withBody(line.line); if (addByteOffset == true) { event.getHeaders().put(BYTE_OFFSET_HEADER_KEY, posTmp.toString()); } return event; } - private String readLine() throws IOException { - ByteArrayDataOutput out = ByteStreams.newDataOutput(300); - int i = 0; - int c; - while ((c = raf.read()) != -1) { - i++; - out.write((byte) c); - if (c == LINE_SEP.charAt(0)) { + private void readFile() throws IOException { + if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) { + buffer = new byte[(int) (raf.length() - raf.getFilePointer())]; + } else { + buffer = new byte[BUFFER_SIZE]; + } + raf.read(buffer, 0, buffer.length); + bufferPos = 0; + } + + private byte[] concatByteArrays(byte[] a, int startIdxA, int lenA, byte[] b, int startIdxB, int lenB) { + byte[] c = new byte[lenA + lenB]; + System.arraycopy(a, startIdxA, c, 0, lenA); + System.arraycopy(b, startIdxB, c, lenA, lenB); + return c; + } + + public LineResult readLine() throws IOException { + LineResult lineResult = null; + while (true) { + if (bufferPos == NEED_READING) { + if (raf.getFilePointer() < raf.length()) { + readFile(); + } else { + if (oldBuffer.length > 0) { + lineResult = new LineResult(false, oldBuffer); + oldBuffer = new byte[0]; + setLineReadPos(lineReadPos + lineResult.line.length); + } + break; + } + } + for (int i = bufferPos; i < buffer.length; i++) { + if (buffer[i] == BYTE_NL) { + int oldLen = oldBuffer.length; + // Don't copy last byte(NEW_LINE) + int lineLen = i - bufferPos; + // For windows, check for CR + if (i > 0 && buffer[i - 1] == BYTE_CR) { + lineLen -= 1; + } else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) { + oldLen -= 1; + } + lineResult = new LineResult(true, + concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen)); + setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1))); + oldBuffer = new byte[0]; + if (i + 1 < buffer.length) { + bufferPos = i + 1; + } else { + bufferPos = NEED_READING; + } + break; + } + } + if (lineResult != null) { break; } + // NEW_LINE not showed up at the end of the buffer + oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length, buffer, bufferPos, (buffer.length - bufferPos)); + bufferPos = NEED_READING; } - if (i == 0) { - return null; - } - return new String(out.toByteArray(), Charsets.UTF_8); + return lineResult; } public void close() { @@ -159,5 +219,14 @@ public class TailFile { } } + private class LineResult { + final boolean lineSepInclude; + final byte[] line; + public LineResult(boolean lineSepInclude, byte[] line) { + super(); + this.lineSepInclude = lineSepInclude; + this.line = line; + } + } }
