Repository: flume Updated Branches: refs/heads/flume-1.7 0a34e1e87 -> d02013f4e
FLUME-2498. Implement Taildir Source (Satoshi Iijima via Roshan Naik) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d02013f4 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d02013f4 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d02013f4 Branch: refs/heads/flume-1.7 Commit: d02013f4e1ee429b57f24bdfad72e6c6707d0653 Parents: 0a34e1e Author: Roshan Naik <[email protected]> Authored: Mon Aug 17 19:34:03 2015 -0700 Committer: Roshan Naik <[email protected]> Committed: Mon Aug 17 19:34:03 2015 -0700 ---------------------------------------------------------------------- .../flume/conf/source/SourceConfiguration.java | 10 +- .../apache/flume/conf/source/SourceType.java | 10 +- flume-ng-dist/pom.xml | 4 + flume-ng-doc/sphinx/FlumeUserGuide.rst | 55 ++- flume-ng-sources/flume-taildir-source/pom.xml | 60 +++ .../taildir/ReliableTaildirEventReader.java | 347 +++++++++++++ .../apache/flume/source/taildir/TailFile.java | 163 ++++++ .../flume/source/taildir/TaildirSource.java | 331 +++++++++++++ .../TaildirSourceConfigurationConstants.java | 52 ++ .../source/taildir/TestTaildirEventReader.java | 492 +++++++++++++++++++ .../flume/source/taildir/TestTaildirSource.java | 283 +++++++++++ flume-ng-sources/pom.xml | 1 + pom.xml | 6 + 13 files changed, 1811 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java index 899d805..068bd69 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java @@ -207,7 +207,15 @@ public class SourceConfiguration extends ComponentConfiguration { * * @see org.apache.flume.source.jms.JMSSource */ - JMS("org.apache.flume.conf.source.jms.JMSSourceConfiguration"); + JMS("org.apache.flume.conf.source.jms.JMSSourceConfiguration"), + + /** + * TAILDIR Source + * + * @see org.apache.flume.source.taildir.TaildirSource + */ + TAILDIR("org.apache.flume.source.taildir.TaildirSourceConfiguration") + ; private String srcConfigurationName; http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java index 4144faa..4f4073a 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java @@ -103,7 +103,15 @@ public enum SourceType { * * @see org.apache.flume.source.jms.JMSSource */ - JMS("org.apache.flume.source.jms.JMSSource"); + JMS("org.apache.flume.source.jms.JMSSource"), + + /** + * Taildir Source + * + * @see org.apache.flume.source.taildir.TaildirSource + */ + TAILDIR("org.apache.flume.source.taildir.TaildirSource") + ; private final String sourceClassName; http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml index 218c6b8..7fdf36a 100644 --- a/flume-ng-dist/pom.xml +++ b/flume-ng-dist/pom.xml @@ -189,6 +189,10 @@ <artifactId>flume-thrift-source</artifactId> </dependency> <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-taildir-source</artifactId> + </dependency> + <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 7ddcc48..897a2ca 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -994,7 +994,7 @@ trackerDir .flumespool Directory to store metadata related to pro consumeOrder oldest In which order files in the spooling directory will be consumed ``oldest``, ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified time of the files will be used to compare the files. In case of a tie, the file - with smallest laxicographical order will be consumed first. In case of ``random`` any + with smallest lexicographical order will be consumed first. In case of ``random`` any file will be picked randomly. When using ``oldest`` and ``youngest`` the whole directory will be scanned to pick the oldest/youngest file, which might be slow if there are a large number of files, while using ``random`` may cause old files to be consumed @@ -1090,6 +1090,59 @@ Property Name Default Description deserializer.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request ========================== ================== ======================================================================= +Taildir Source +~~~~~~~~~~~~~~~~~~~~~~~~~ +.. note:: **This source is provided as a preview feature. It does not work on Windows.** This source requires Java version 1.7 or later. + +Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. +If the new lines are being written, this source will retry reading them in wait for the completion of the write. + +This source is reliable and will not miss data even when the tailing files rotate. +It periodically writes the last read position of each files on the given position file in JSON format. +If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file. + +In other use case, this source can also start tailing from the arbitrary position for each files using the given position file. +When there is no position file on the specified path, it will start tailing from the first line of each files by default. + +Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first. + +This source does not rename or delete or do any modifications to the file being tailed. +Currently this source does not support tailing binary files. It reads text files line by line. + +=================================== ============================== =================================================== +Property Name Default Description +=================================== ============================== =================================================== +**channels** -- +**type** -- The component type name, needs to be ``TAILDIR``. +**filegroups** -- Space-separated list of file groups. Each file group indicates a set of files to be tailed. +**filegroups.<filegroupName>** -- Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only. +positionFile ~/.flume/taildir_position.json File in JSON format to record the inode, the absolute path and the last position of each tailing file. +headers.<filegroupName>.<headerKey> -- Header value which is the set with header key. Multiple headers can be specified for one file group. +byteOffsetHeader false Whether to add the byte offset of a tailed line to a header called 'byteoffset'. +skipToEnd false Whether to skip the position to EOF in the case of files not written on the position file. +idleTimeout 120000 Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it. +writePosInterval 3000 Interval time (ms) to write the last position of each file on the position file. +batchSize 100 Max number of lines to read and send to the channel at a time. Using the default is usually fine. +backoffSleepIncrement 1000 The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data. +maxBackoffSleep 5000 The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data. +=================================== ============================== =================================================== + +Example for agent named a1: + +.. code-block:: properties + + a1.sources = r1 + a1.channels = c1 + a1.sources.r1.type = TAILDIR + a1.sources.r1.channels = c1 + a1.sources.r1.positionFile = /var/log/flume/taildir_position.json + a1.sources.r1.filegroups = f1 f2 + a1.sources.r1.filegroups.f1 = /var/log/test1/example.log + a1.sources.r1.headers.f1.headerKey1 = value1 + a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* + a1.sources.r1.headers.f2.headerKey1 = value2 + a1.sources.r1.headers.f2.headerKey2 = value2-2 + Twitter 1% firehose Source (experimental) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-sources/flume-taildir-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/pom.xml b/flume-ng-sources/flume-taildir-source/pom.xml new file mode 100644 index 0000000..09063fb --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/pom.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flume-ng-sources</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.7.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-taildir-source</artifactId> + <name>Flume Taildir Source</name> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java new file mode 100644 index 0000000..951b786 --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.source.taildir; + +import java.io.File; +import java.io.FileFilter; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.regex.Pattern; + +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.client.avro.ReliableEventReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import com.google.common.collect.Table.Cell; +import com.google.gson.stream.JsonReader; + [email protected] [email protected] +public class ReliableTaildirEventReader implements ReliableEventReader { + private static final Logger logger = LoggerFactory.getLogger(ReliableTaildirEventReader.class); + + private final Table<String, File, Pattern> tailFileTable; + private final Table<String, String, String> headerTable; + + private TailFile currentFile = null; + private Map<Long, TailFile> tailFiles = Maps.newHashMap(); + private long updateTime; + private boolean addByteOffset; + private boolean committed = true; + + /** + * Create a ReliableTaildirEventReader to watch the given directory. + */ + private ReliableTaildirEventReader(Map<String, String> filePaths, + Table<String, String, String> headerTable, String positionFilePath, + boolean skipToEnd, boolean addByteOffset) throws IOException { + // Sanity checks + Preconditions.checkNotNull(filePaths); + Preconditions.checkNotNull(positionFilePath); + + if (logger.isDebugEnabled()) { + logger.debug("Initializing {} with directory={}, metaDir={}", + new Object[] { ReliableTaildirEventReader.class.getSimpleName(), filePaths }); + } + + Table<String, File, Pattern> tailFileTable = HashBasedTable.create(); + for (Entry<String, String> e : filePaths.entrySet()) { + File f = new File(e.getValue()); + File parentDir = f.getParentFile(); + Preconditions.checkState(parentDir.exists(), + "Directory does not exist: " + parentDir.getAbsolutePath()); + Pattern fileNamePattern = Pattern.compile(f.getName()); + tailFileTable.put(e.getKey(), parentDir, fileNamePattern); + } + logger.info("tailFileTable: " + tailFileTable.toString()); + logger.info("headerTable: " + headerTable.toString()); + + this.tailFileTable = tailFileTable; + this.headerTable = headerTable; + this.addByteOffset = addByteOffset; + updateTailFiles(skipToEnd); + + logger.info("Updating position from position file: " + positionFilePath); + loadPositionFile(positionFilePath); + } + + /** + * Load a position file which has the last read position of each file. + * If the position file exists, update tailFiles mapping. + */ + public void loadPositionFile(String filePath) { + Long inode, pos; + String path; + FileReader fr = null; + JsonReader jr = null; + try { + fr = new FileReader(filePath); + jr = new JsonReader(fr); + jr.beginArray(); + while (jr.hasNext()) { + inode = null; + pos = null; + path = null; + jr.beginObject(); + while (jr.hasNext()) { + switch (jr.nextName()) { + case "inode": + inode = jr.nextLong(); + break; + case "pos": + pos = jr.nextLong(); + break; + case "file": + path = jr.nextString(); + break; + } + } + jr.endObject(); + + for (Object v : Arrays.asList(inode, pos, path)) { + Preconditions.checkNotNull(v, "Detected missing value in position file. " + + "inode: " + inode + ", pos: " + pos + ", path: " + path); + } + TailFile tf = tailFiles.get(inode); + if (tf != null && tf.updatePos(path, inode, pos)) { + tailFiles.put(inode, tf); + } else { + logger.info("Missing file: " + path + ", inode: " + inode + ", pos: " + pos); + } + } + jr.endArray(); + } catch (FileNotFoundException e) { + logger.info("File not found: " + filePath + ", not updating position"); + } catch (IOException e) { + logger.error("Failed loading positionFile: " + filePath, e); + } finally { + try { + if (fr != null) fr.close(); + if (jr != null) jr.close(); + } catch (IOException e) { + logger.error("Error: " + e.getMessage(), e); + } + } + } + + public Map<Long, TailFile> getTailFiles() { + return tailFiles; + } + + public void setCurrentFile(TailFile currentFile) { + this.currentFile = currentFile; + } + + @Override + public Event readEvent() throws IOException { + List<Event> events = readEvents(1); + if (events.isEmpty()) { + return null; + } + return events.get(0); + } + + @Override + public List<Event> readEvents(int numEvents) throws IOException { + return readEvents(numEvents, false); + } + + @VisibleForTesting + public List<Event> readEvents(TailFile tf, int numEvents) throws IOException { + setCurrentFile(tf); + return readEvents(numEvents, true); + } + + public List<Event> readEvents(int numEvents, boolean backoffWithoutNL) + throws IOException { + if (!committed) { + if (currentFile == null) { + throw new IllegalStateException("current file does not exist. " + currentFile.getPath()); + } + logger.info("Last read was never committed - resetting position"); + long lastPos = currentFile.getPos(); + currentFile.getRaf().seek(lastPos); + } + List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset); + if (events.isEmpty()) { + return events; + } + + Map<String, String> headers = currentFile.getHeaders(); + if (headers != null && !headers.isEmpty()) { + for (Event event : events) { + event.getHeaders().putAll(headers); + } + } + committed = false; + return events; + } + + @Override + public void close() throws IOException { + for (TailFile tf : tailFiles.values()) { + if (tf.getRaf() != null) tf.getRaf().close(); + } + } + + /** Commit the last lines which were read. */ + @Override + public void commit() throws IOException { + if (!committed && currentFile != null) { + long pos = currentFile.getRaf().getFilePointer(); + currentFile.setPos(pos); + currentFile.setLastUpdated(updateTime); + committed = true; + } + } + + /** + * Update tailFiles mapping if a new file is created or appends are detected + * to the existing file. + */ + public List<Long> updateTailFiles(boolean skipToEnd) throws IOException { + updateTime = System.currentTimeMillis(); + List<Long> updatedInodes = Lists.newArrayList(); + + for (Cell<String, File, Pattern> cell : tailFileTable.cellSet()) { + Map<String, String> headers = headerTable.row(cell.getRowKey()); + File parentDir = cell.getColumnKey(); + Pattern fileNamePattern = cell.getValue(); + + for (File f : getMatchFiles(parentDir, fileNamePattern)) { + long inode = getInode(f); + TailFile tf = tailFiles.get(inode); + if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { + long startPos = skipToEnd ? f.length() : 0; + tf = openFile(f, headers, inode, startPos); + } else{ + boolean updated = tf.getLastUpdated() < f.lastModified(); + if (updated) { + if (tf.getRaf() == null) { + tf = openFile(f, headers, inode, tf.getPos()); + } + if (f.length() < tf.getPos()) { + logger.info("Pos " + tf.getPos() + " is larger than file size! " + + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode); + tf.updatePos(tf.getPath(), inode, 0); + } + } + tf.setNeedTail(updated); + } + tailFiles.put(inode, tf); + updatedInodes.add(inode); + } + } + return updatedInodes; + } + + public List<Long> updateTailFiles() throws IOException { + return updateTailFiles(false); + } + + private List<File> getMatchFiles(File parentDir, final Pattern fileNamePattern) { + FileFilter filter = new FileFilter() { + public boolean accept(File f) { + String fileName = f.getName(); + if (f.isDirectory() || !fileNamePattern.matcher(fileName).matches()) { + return false; + } + return true; + } + }; + File[] files = parentDir.listFiles(filter); + ArrayList<File> result = Lists.newArrayList(files); + Collections.sort(result, new TailFile.CompareByLastModifiedTime()); + return result; + } + + private long getInode(File file) throws IOException { + long inode = (long) Files.getAttribute(file.toPath(), "unix:ino"); + return inode; + } + + private TailFile openFile(File file, Map<String, String> headers, long inode, long pos) { + try { + logger.info("Opening file: " + file + ", inode: " + inode + ", pos: " + pos); + return new TailFile(file, headers, inode, pos); + } catch (IOException e) { + throw new FlumeException("Failed opening file: " + file, e); + } + } + + /** + * Special builder class for ReliableTaildirEventReader + */ + public static class Builder { + private Map<String, String> filePaths; + private Table<String, String, String> headerTable; + private String positionFilePath; + private boolean skipToEnd; + private boolean addByteOffset; + + public Builder filePaths(Map<String, String> filePaths) { + this.filePaths = filePaths; + return this; + } + + public Builder headerTable(Table<String, String, String> headerTable) { + this.headerTable = headerTable; + return this; + } + + public Builder positionFilePath(String positionFilePath) { + this.positionFilePath = positionFilePath; + return this; + } + + public Builder skipToEnd(boolean skipToEnd) { + this.skipToEnd = skipToEnd; + return this; + } + + public Builder addByteOffset(boolean addByteOffset) { + this.addByteOffset = addByteOffset; + return this; + } + + public ReliableTaildirEventReader build() throws IOException { + return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, addByteOffset); + } + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java new file mode 100644 index 0000000..99683da --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.source.taildir; + +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; + +public class TailFile { + private static final Logger logger = LoggerFactory.getLogger(TailFile.class); + + private static final String LINE_SEP = "\n"; + private static final String LINE_SEP_WIN = "\r\n"; + + private RandomAccessFile raf; + private final String path; + private final long inode; + private long pos; + private long lastUpdated; + private boolean needTail; + private final Map<String, String> headers; + + public TailFile(File file, Map<String, String> headers, long inode, long pos) + throws IOException { + this.raf = new RandomAccessFile(file, "r"); + if (pos > 0) raf.seek(pos); + this.path = file.getAbsolutePath(); + this.inode = inode; + this.pos = pos; + this.lastUpdated = 0L; + this.needTail = true; + this.headers = headers; + } + + public RandomAccessFile getRaf() { return raf; } + public String getPath() { return path; } + public long getInode() { return inode; } + public long getPos() { return pos; } + public long getLastUpdated() { return lastUpdated; } + public boolean needTail() { return needTail; } + public Map<String, String> getHeaders() { return headers; } + + public void setPos(long pos) { this.pos = pos; } + public void setLastUpdated(long lastUpdated) { this.lastUpdated = lastUpdated; } + public void setNeedTail(boolean needTail) { this.needTail = needTail; } + + public boolean updatePos(String path, long inode, long pos) throws IOException { + if (this.inode == inode && this.path.equals(path)) { + raf.seek(pos); + setPos(pos); + logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos); + return true; + } + return false; + } + + public List<Event> readEvents(int numEvents, boolean backoffWithoutNL, + boolean addByteOffset) throws IOException { + List<Event> events = Lists.newLinkedList(); + for (int i = 0; i < numEvents; i++) { + Event event = readEvent(backoffWithoutNL, addByteOffset); + if (event == null) { + break; + } + events.add(event); + } + return events; + } + + private Event readEvent(boolean backoffWithoutNL, boolean addByteOffset) throws IOException { + Long posTmp = raf.getFilePointer(); + String line = readLine(); + if (line == null) { + return null; + } + if (backoffWithoutNL && !line.endsWith(LINE_SEP)) { + logger.info("Backing off in file without newline: " + + path + ", inode: " + inode + ", pos: " + raf.getFilePointer()); + raf.seek(posTmp); + return null; + } + + String lineSep = LINE_SEP; + if(line.endsWith(LINE_SEP_WIN)) { + lineSep = LINE_SEP_WIN; + } + Event event = EventBuilder.withBody(StringUtils.removeEnd(line, lineSep), Charsets.UTF_8); + if (addByteOffset == true) { + event.getHeaders().put(BYTE_OFFSET_HEADER_KEY, posTmp.toString()); + } + return event; + } + + private String readLine() throws IOException { + ByteArrayDataOutput out = ByteStreams.newDataOutput(300); + int i = 0; + int c; + while ((c = raf.read()) != -1) { + i++; + out.write((byte) c); + if (c == LINE_SEP.charAt(0)) { + break; + } + } + if (i == 0) { + return null; + } + return new String(out.toByteArray(), Charsets.UTF_8); + } + + public void close() { + try { + raf.close(); + raf = null; + long now = System.currentTimeMillis(); + setLastUpdated(now); + } catch (IOException e) { + logger.error("Failed closing file: " + path + ", inode: " + inode, e); + } + } + + public static class CompareByLastModifiedTime implements Comparator<File> { + @Override + public int compare(File f1, File f2) { + return Long.valueOf(f1.lastModified()).compareTo(f2.lastModified()); + } + } + + +} http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java new file mode 100644 index 0000000..97ca43b --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.flume.source.taildir; + +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.flume.PollableSource; +import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.SourceCounter; +import org.apache.flume.source.AbstractSource; +import org.apache.flume.source.PollableSourceConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gson.Gson; + +public class TaildirSource extends AbstractSource implements + PollableSource, Configurable { + + private static final Logger logger = LoggerFactory.getLogger(TaildirSource.class); + + private Map<String, String> filePaths; + private Table<String, String, String> headerTable; + private int batchSize; + private String positionFilePath; + private boolean skipToEnd; + private boolean byteOffsetHeader; + + private SourceCounter sourceCounter; + private ReliableTaildirEventReader reader; + private ScheduledExecutorService idleFileChecker; + private ScheduledExecutorService positionWriter; + private int retryInterval = 1000; + private int maxRetryInterval = 5000; + private int idleTimeout; + private int checkIdleInterval = 5000; + private int writePosInitDelay = 5000; + private int writePosInterval; + + private List<Long> existingInodes = new CopyOnWriteArrayList<Long>(); + private List<Long> idleInodes = new CopyOnWriteArrayList<Long>(); + private Long backoffSleepIncrement; + private Long maxBackOffSleepInterval; + + @Override + public synchronized void start() { + logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths); + try { + reader = new ReliableTaildirEventReader.Builder() + .filePaths(filePaths) + .headerTable(headerTable) + .positionFilePath(positionFilePath) + .skipToEnd(skipToEnd) + .addByteOffset(byteOffsetHeader) + .build(); + } catch (IOException e) { + throw new FlumeException("Error instantiating ReliableTaildirEventReader", e); + } + idleFileChecker = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build()); + idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(), + idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS); + + positionWriter = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("positionWriter").build()); + positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(), + writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS); + + super.start(); + logger.debug("TaildirSource started"); + sourceCounter.start(); + } + + @Override + public synchronized void stop() { + try { + super.stop(); + ExecutorService[] services = {idleFileChecker, positionWriter}; + for (ExecutorService service : services) { + service.shutdown(); + if (!service.awaitTermination(1, TimeUnit.SECONDS)) { + service.shutdownNow(); + } + } + // write the last position + writePosition(); + reader.close(); + } catch (InterruptedException e) { + logger.info("Interrupted while awaiting termination", e); + } catch (IOException e) { + logger.info("Failed: " + e.getMessage(), e); + } + sourceCounter.stop(); + logger.info("Taildir source {} stopped. Metrics: {}", getName(), sourceCounter); + } + + @Override + public String toString() { + return String.format("Taildir source: { positionFile: %s, skipToEnd: %s, " + + "byteOffsetHeader: %s, idleTimeout: %s, writePosInterval: %s }", + positionFilePath, skipToEnd, byteOffsetHeader, idleTimeout, writePosInterval); + } + + @Override + public synchronized void configure(Context context) { + String fileGroups = context.getString(FILE_GROUPS); + Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS); + + filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX), fileGroups.split("\\s+")); + Preconditions.checkState(!filePaths.isEmpty(), + "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'"); + + String homePath = System.getProperty("user.home").replace('\\', '/'); + positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE); + headerTable = getTable(context, HEADERS_PREFIX); + batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE); + skipToEnd = context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END); + byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER); + idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT); + writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL); + + backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT + , PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT); + maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP + , PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP); + + if (sourceCounter == null) { + sourceCounter = new SourceCounter(getName()); + } + } + + private Map<String, String> selectByKeys(Map<String, String> map, String[] keys) { + Map<String, String> result = Maps.newHashMap(); + for (String key : keys) { + if (map.containsKey(key)) { + result.put(key, map.get(key)); + } + } + return result; + } + + private Table<String, String, String> getTable(Context context, String prefix) { + Table<String, String, String> table = HashBasedTable.create(); + for (Entry<String, String> e : context.getSubProperties(prefix).entrySet()) { + String[] parts = e.getKey().split("\\.", 2); + table.put(parts[0], parts[1], e.getValue()); + } + return table; + } + + @VisibleForTesting + protected SourceCounter getSourceCounter() { + return sourceCounter; + } + + @Override + public Status process() { + Status status = Status.READY; + try { + existingInodes.clear(); + existingInodes.addAll(reader.updateTailFiles()); + for (long inode : existingInodes) { + TailFile tf = reader.getTailFiles().get(inode); + if (tf.needTail()) { + tailFileProcess(tf, true); + } + } + closeTailFiles(); + try { + TimeUnit.MILLISECONDS.sleep(retryInterval); + } catch (InterruptedException e) { + logger.info("Interrupted while sleeping"); + } + } catch (Throwable t) { + logger.error("Unable to tail files", t); + status = Status.BACKOFF; + } + return status; + } + + @Override + public long getBackOffSleepIncrement() { + return backoffSleepIncrement; + } + + @Override + public long getMaxBackOffSleepInterval() { + return maxBackOffSleepInterval; + } + + private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) + throws IOException, InterruptedException { + while (true) { + reader.setCurrentFile(tf); + List<Event> events = reader.readEvents(batchSize, backoffWithoutNL); + if (events.isEmpty()) { + break; + } + sourceCounter.addToEventReceivedCount(events.size()); + sourceCounter.incrementAppendBatchReceivedCount(); + try { + getChannelProcessor().processEventBatch(events); + reader.commit(); + } catch (ChannelException ex) { + logger.warn("The channel is full or unexpected failure. " + + "The source will try again after " + retryInterval + " ms"); + TimeUnit.MILLISECONDS.sleep(retryInterval); + retryInterval = retryInterval << 1; + retryInterval = Math.min(retryInterval, maxRetryInterval); + continue; + } + retryInterval = 1000; + sourceCounter.addToEventAcceptedCount(events.size()); + sourceCounter.incrementAppendBatchAcceptedCount(); + if (events.size() < batchSize) { + break; + } + } + } + + private void closeTailFiles() throws IOException, InterruptedException { + for (long inode : idleInodes) { + TailFile tf = reader.getTailFiles().get(inode); + if (tf.getRaf() != null) { // when file has not closed yet + tailFileProcess(tf, false); + tf.close(); + logger.info("Closed file: " + tf.getPath() + ", inode: " + inode + ", pos: " + tf.getPos()); + } + } + idleInodes.clear(); + } + + /** + * Runnable class that checks whether there are files which should be closed. + */ + private class idleFileCheckerRunnable implements Runnable { + @Override + public void run() { + try { + long now = System.currentTimeMillis(); + for (TailFile tf : reader.getTailFiles().values()) { + if (tf.getLastUpdated() + idleTimeout < now && tf.getRaf() != null) { + idleInodes.add(tf.getInode()); + } + } + } catch (Throwable t) { + logger.error("Uncaught exception in IdleFileChecker thread", t); + } + } + } + + /** + * Runnable class that writes a position file which has the last read position + * of each file. + */ + private class PositionWriterRunnable implements Runnable { + @Override + public void run() { + writePosition(); + } + } + + private void writePosition() { + File file = new File(positionFilePath); + FileWriter writer = null; + try { + writer = new FileWriter(file); + if (!existingInodes.isEmpty()) { + String json = toPosInfoJson(); + writer.write(json); + } + } catch (Throwable t){ + logger.error("Failed writing positionFile", t); + } finally { + try { + if (writer != null) writer.close(); + } catch (IOException e) { + logger.error("Error: " + e.getMessage(), e); + } + } + } + + private String toPosInfoJson() { + @SuppressWarnings("rawtypes") + List<Map> posInfos = Lists.newArrayList(); + for (Long inode : existingInodes) { + TailFile tf = reader.getTailFiles().get(inode); + posInfos.add(ImmutableMap.of("inode", inode, "pos", tf.getPos(), "file", tf.getPath())); + } + return new Gson().toJson(posInfos); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java new file mode 100644 index 0000000..6165276 --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.flume.source.taildir; + +public class TaildirSourceConfigurationConstants { + /** Mapping for tailing file groups. */ + public static final String FILE_GROUPS = "filegroups"; + public static final String FILE_GROUPS_PREFIX = FILE_GROUPS + "."; + + /** Mapping for putting headers to events grouped by file groups. */ + public static final String HEADERS_PREFIX = "headers."; + + /** Path of position file. */ + public static final String POSITION_FILE = "positionFile"; + public static final String DEFAULT_POSITION_FILE = "/.flume/taildir_position.json"; + + /** What size to batch with before sending to ChannelProcessor. */ + public static final String BATCH_SIZE = "batchSize"; + public static final int DEFAULT_BATCH_SIZE = 100; + + /** Whether to skip the position to EOF in the case of files not written on the position file. */ + public static final String SKIP_TO_END = "skipToEnd"; + public static final boolean DEFAULT_SKIP_TO_END = false; + + /** Time (ms) to close idle files. */ + public static final String IDLE_TIMEOUT = "idleTimeout"; + public static final int DEFAULT_IDLE_TIMEOUT = 120000; + + /** Interval time (ms) to write the last position of each file on the position file. */ + public static final String WRITE_POS_INTERVAL = "writePosInterval"; + public static final int DEFAULT_WRITE_POS_INTERVAL = 3000; + + /** Whether to add the byte offset of a tailed line to the header */ + public static final String BYTE_OFFSET_HEADER = "byteOffsetHeader"; + public static final String BYTE_OFFSET_HEADER_KEY = "byteoffset"; + public static final boolean DEFAULT_BYTE_OFFSET_HEADER = false; +} http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java new file mode 100644 index 0000000..1896883 --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.source.taildir; + +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*; +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flume.Event; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.collect.Table; +import com.google.common.io.Files; + +public class TestTaildirEventReader { + private File tmpDir; + private String posFilePath; + + public static String bodyAsString(Event event) { + return new String(event.getBody()); + } + + static List<String> bodiesAsStrings(List<Event> events) { + List<String> bodies = Lists.newArrayListWithCapacity(events.size()); + for (Event event : events) { + bodies.add(new String(event.getBody())); + } + return bodies; + } + + static List<String> headersAsStrings(List<Event> events, String headerKey) { + List<String> headers = Lists.newArrayListWithCapacity(events.size()); + for (Event event : events) { + headers.add(new String(event.getHeaders().get(headerKey))); + } + return headers; + } + + private ReliableTaildirEventReader getReader(Map<String, String> filePaths, + Table<String, String, String> headerTable, boolean addByteOffset) { + ReliableTaildirEventReader reader; + try { + reader = new ReliableTaildirEventReader.Builder() + .filePaths(filePaths) + .headerTable(headerTable) + .positionFilePath(posFilePath) + .skipToEnd(false) + .addByteOffset(addByteOffset) + .build(); + reader.updateTailFiles(); + } catch (IOException ioe) { + throw Throwables.propagate(ioe); + } + return reader; + } + + private ReliableTaildirEventReader getReader(boolean addByteOffset) { + Map<String, String> filePaths = ImmutableMap.of("testFiles", tmpDir.getAbsolutePath() + "/file.*"); + Table<String, String, String> headerTable = HashBasedTable.create(); + return getReader(filePaths, headerTable, addByteOffset); + } + + private ReliableTaildirEventReader getReader() { + return getReader(false); + } + + @Before + public void setUp() { + tmpDir = Files.createTempDir(); + posFilePath = tmpDir.getAbsolutePath() + "/taildir_position_test.json"; + } + + @After + public void tearDown() { + for (File f : tmpDir.listFiles()) { + if (f.isDirectory()) { + for (File sdf : f.listFiles()) { + sdf.delete(); + } + } + f.delete(); + } + tmpDir.delete(); + } + + @Test + // Create three multi-line files then read them back out. Ensures that + // lines and appended ones are read correctly from files. + public void testBasicReadFiles() throws IOException { + File f1 = new File(tmpDir, "file1"); + File f2 = new File(tmpDir, "file2"); + File f3 = new File(tmpDir, "file3"); + Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8); + Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8); + Files.write("file3line1\nfile3line2\n", f3, Charsets.UTF_8); + + ReliableTaildirEventReader reader = getReader(); + List<String> out = Lists.newArrayList(); + for (TailFile tf : reader.getTailFiles().values()) { + List<String> bodies = bodiesAsStrings(reader.readEvents(tf, 2)); + out.addAll(bodies); + reader.commit(); + } + assertEquals(6, out.size()); + // Make sure we got every line + assertTrue(out.contains("file1line1")); + assertTrue(out.contains("file1line2")); + assertTrue(out.contains("file2line1")); + assertTrue(out.contains("file2line2")); + assertTrue(out.contains("file3line1")); + assertTrue(out.contains("file3line2")); + + Files.append("file3line3\nfile3line4\n", f3, Charsets.UTF_8); + + reader.updateTailFiles(); + for (TailFile tf : reader.getTailFiles().values()) { + List<String> bodies = bodiesAsStrings(reader.readEvents(tf, 2)); + out.addAll(bodies); + reader.commit(); + } + assertEquals(8, out.size()); + assertTrue(out.contains("file3line3")); + assertTrue(out.contains("file3line4")); + } + + @Test + // Make sure this works when there are initially no files + // and we finish reading all files and fully commit. + public void testInitiallyEmptyDirAndBehaviorAfterReadingAll() throws IOException { + ReliableTaildirEventReader reader = getReader(); + + List<Long> fileInodes = reader.updateTailFiles(); + assertEquals(0, fileInodes.size()); + + File f1 = new File(tmpDir, "file1"); + Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8); + + reader.updateTailFiles(); + List<String> out = null; + for (TailFile tf : reader.getTailFiles().values()) { + out = bodiesAsStrings(reader.readEvents(tf, 2)); + reader.commit(); + } + assertEquals(2, out.size()); + // Make sure we got every line + assertTrue(out.contains("file1line1")); + assertTrue(out.contains("file1line2")); + + reader.updateTailFiles(); + List<String> empty = null; + for (TailFile tf : reader.getTailFiles().values()) { + empty = bodiesAsStrings(reader.readEvents(tf, 15)); + reader.commit(); + } + assertEquals(0, empty.size()); + } + + @Test + // Test a basic case where a commit is missed. + public void testBasicCommitFailure() throws IOException { + File f1 = new File(tmpDir, "file1"); + Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + + "file1line5\nfile1line6\nfile1line7\nfile1line8\n" + + "file1line9\nfile1line10\nfile1line11\nfile1line12\n", + f1, Charsets.UTF_8); + + ReliableTaildirEventReader reader = getReader(); + List<String> out1 = null; + for (TailFile tf : reader.getTailFiles().values()) { + out1 = bodiesAsStrings(reader.readEvents(tf, 4)); + } + assertTrue(out1.contains("file1line1")); + assertTrue(out1.contains("file1line2")); + assertTrue(out1.contains("file1line3")); + assertTrue(out1.contains("file1line4")); + + List<String> out2 = bodiesAsStrings(reader.readEvents(4)); + assertTrue(out2.contains("file1line1")); + assertTrue(out2.contains("file1line2")); + assertTrue(out2.contains("file1line3")); + assertTrue(out2.contains("file1line4")); + + reader.commit(); + List<String> out3 = bodiesAsStrings(reader.readEvents(4)); + assertTrue(out3.contains("file1line5")); + assertTrue(out3.contains("file1line6")); + assertTrue(out3.contains("file1line7")); + assertTrue(out3.contains("file1line8")); + + reader.commit(); + List<String> out4 = bodiesAsStrings(reader.readEvents(4)); + assertEquals(4, out4.size()); + assertTrue(out4.contains("file1line9")); + assertTrue(out4.contains("file1line10")); + assertTrue(out4.contains("file1line11")); + assertTrue(out4.contains("file1line12")); + } + + @Test + // Test a case where a commit is missed and the batch size changes. + public void testBasicCommitFailureAndBatchSizeChanges() throws IOException { + File f1 = new File(tmpDir, "file1"); + Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); + + ReliableTaildirEventReader reader = getReader(); + List<String> out1 = null; + for (TailFile tf : reader.getTailFiles().values()) { + out1 = bodiesAsStrings(reader.readEvents(tf, 5)); + } + assertTrue(out1.contains("file1line1")); + assertTrue(out1.contains("file1line2")); + assertTrue(out1.contains("file1line3")); + assertTrue(out1.contains("file1line4")); + assertTrue(out1.contains("file1line5")); + + List<String> out2 = bodiesAsStrings(reader.readEvents(2)); + assertTrue(out2.contains("file1line1")); + assertTrue(out2.contains("file1line2")); + + reader.commit(); + List<String> out3 = bodiesAsStrings(reader.readEvents(2)); + assertTrue(out3.contains("file1line3")); + assertTrue(out3.contains("file1line4")); + + reader.commit(); + List<String> out4 = bodiesAsStrings(reader.readEvents(15)); + assertTrue(out4.contains("file1line5")); + assertTrue(out4.contains("file1line6")); + assertTrue(out4.contains("file1line7")); + assertTrue(out4.contains("file1line8")); + } + + @Test + public void testIncludeEmptyFile() throws IOException { + File f1 = new File(tmpDir, "file1"); + File f2 = new File(tmpDir, "file2"); + Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8); + Files.touch(f2); + + ReliableTaildirEventReader reader = getReader(); + // Expect to read nothing from empty file + List<String> out = Lists.newArrayList(); + for (TailFile tf : reader.getTailFiles().values()) { + out.addAll(bodiesAsStrings(reader.readEvents(tf, 5))); + reader.commit(); + } + assertEquals(2, out.size()); + assertTrue(out.contains("file1line1")); + assertTrue(out.contains("file1line2")); + assertNull(reader.readEvent()); + } + + @Test + public void testBackoffWithoutNewLine() throws IOException { + File f1 = new File(tmpDir, "file1"); + Files.write("file1line1\nfile1", f1, Charsets.UTF_8); + + ReliableTaildirEventReader reader = getReader(); + List<String> out = Lists.newArrayList(); + // Expect to read only the line with newline + for (TailFile tf : reader.getTailFiles().values()) { + out.addAll(bodiesAsStrings(reader.readEvents(tf, 5))); + reader.commit(); + } + assertEquals(1, out.size()); + assertTrue(out.contains("file1line1")); + + Files.append("line2\nfile1line3\nfile1line4", f1, Charsets.UTF_8); + + for (TailFile tf : reader.getTailFiles().values()) { + out.addAll(bodiesAsStrings(reader.readEvents(tf, 5))); + reader.commit(); + } + assertEquals(3, out.size()); + assertTrue(out.contains("file1line2")); + assertTrue(out.contains("file1line3")); + + // Should read the last line if it finally has no newline + out.addAll(bodiesAsStrings(reader.readEvents(5, false))); + reader.commit(); + assertEquals(4, out.size()); + assertTrue(out.contains("file1line4")); + } + + @Test + public void testBatchedReadsAcrossFileBoundary() throws IOException { + File f1 = new File(tmpDir, "file1"); + Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); + + ReliableTaildirEventReader reader = getReader(); + List<String> out1 = Lists.newArrayList(); + for (TailFile tf : reader.getTailFiles().values()) { + out1.addAll(bodiesAsStrings(reader.readEvents(tf, 5))); + reader.commit(); + } + + File f2 = new File(tmpDir, "file2"); + Files.write("file2line1\nfile2line2\nfile2line3\nfile2line4\n" + + "file2line5\nfile2line6\nfile2line7\nfile2line8\n", + f2, Charsets.UTF_8); + + List<String> out2 = bodiesAsStrings(reader.readEvents(5)); + reader.commit(); + + reader.updateTailFiles(); + List<String> out3 = Lists.newArrayList(); + for (TailFile tf : reader.getTailFiles().values()) { + out3.addAll(bodiesAsStrings(reader.readEvents(tf, 5))); + reader.commit(); + } + + // Should have first 5 lines of file1 + assertEquals(5, out1.size()); + assertTrue(out1.contains("file1line1")); + assertTrue(out1.contains("file1line2")); + assertTrue(out1.contains("file1line3")); + assertTrue(out1.contains("file1line4")); + assertTrue(out1.contains("file1line5")); + + // Should have 3 remaining lines of file1 + assertEquals(3, out2.size()); + assertTrue(out2.contains("file1line6")); + assertTrue(out2.contains("file1line7")); + assertTrue(out2.contains("file1line8")); + + // Should have first 5 lines of file2 + assertEquals(5, out3.size()); + assertTrue(out3.contains("file2line1")); + assertTrue(out3.contains("file2line2")); + assertTrue(out3.contains("file2line3")); + assertTrue(out3.contains("file2line4")); + assertTrue(out3.contains("file2line5")); + } + + @Test + public void testLargeNumberOfFiles() throws IOException { + int fileNum = 1000; + Set<String> expected = Sets.newHashSet(); + + for (int i = 0; i < fileNum; i++) { + String data = "data" + i; + File f = new File(tmpDir, "file" + i); + Files.write(data + "\n", f, Charsets.UTF_8); + expected.add(data); + } + + ReliableTaildirEventReader reader = getReader(); + for (TailFile tf : reader.getTailFiles().values()) { + List<Event> events = reader.readEvents(tf, 10); + for (Event e : events) { + expected.remove(new String(e.getBody())); + } + reader.commit(); + } + assertEquals(0, expected.size()); + } + + @Test + public void testLoadPositionFile() throws IOException { + File f1 = new File(tmpDir, "file1"); + File f2 = new File(tmpDir, "file2"); + File f3 = new File(tmpDir, "file3"); + + Files.write("file1line1\nfile1line2\nfile1line3\n", f1, Charsets.UTF_8); + Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8); + Files.write("file3line1\n", f3, Charsets.UTF_8); + + ReliableTaildirEventReader reader = getReader(); + Map<Long, TailFile> tailFiles = reader.getTailFiles(); + + long pos = f2.length(); + int i = 1; + File posFile = new File(posFilePath); + for (TailFile tf : tailFiles.values()) { + Files.append(i == 1 ? "[" : "", posFile, Charsets.UTF_8); + Files.append(String.format("{\"inode\":%s,\"pos\":%s,\"file\":\"%s\"}", + tf.getInode(), pos, tf.getPath()), posFile, Charsets.UTF_8); + Files.append(i == 3 ? "]" : ",", posFile, Charsets.UTF_8); + i++; + } + reader.loadPositionFile(posFilePath); + + for (TailFile tf : tailFiles.values()) { + if (tf.getPath().equals(tmpDir + "file3")) { + // when given position is larger than file size + assertEquals(0, tf.getPos()); + } else { + assertEquals(pos, tf.getPos()); + } + } + } + + @Test + public void testSkipToEndPosition() throws IOException { + ReliableTaildirEventReader reader = getReader(); + File f1 = new File(tmpDir, "file1"); + Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8); + + reader.updateTailFiles(); + for (TailFile tf : reader.getTailFiles().values()) { + if (tf.getPath().equals(tmpDir + "file1")) { + assertEquals(0, tf.getPos()); + } + } + + File f2 = new File(tmpDir, "file2"); + Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8); + // Expect to skip to EOF the read position when skipToEnd option is true + reader.updateTailFiles(true); + for (TailFile tf : reader.getTailFiles().values()) { + if (tf.getPath().equals(tmpDir + "file2")) { + assertEquals(f2.length(), tf.getPos()); + } + } + } + + @Test + public void testByteOffsetHeader() throws IOException { + File f1 = new File(tmpDir, "file1"); + String line1 = "file1line1\n"; + String line2 = "file1line2\n"; + String line3 = "file1line3\n"; + Files.write(line1 + line2 + line3, f1, Charsets.UTF_8); + + ReliableTaildirEventReader reader = getReader(true); + List<String> headers = null; + for (TailFile tf : reader.getTailFiles().values()) { + headers = headersAsStrings(reader.readEvents(tf, 5), BYTE_OFFSET_HEADER_KEY); + reader.commit(); + } + assertEquals(3, headers.size()); + // Make sure we got byte offset position + assertTrue(headers.contains(String.valueOf(0))); + assertTrue(headers.contains(String.valueOf(line1.length()))); + assertTrue(headers.contains(String.valueOf((line1 + line2).length()))); + } + + @Test + public void testNewLineBoundaries() throws IOException { + File f1 = new File(tmpDir, "file1"); + Files.write("file1line1\nfile1line2\rfile1line2\nfile1line3\r\nfile1line4\n", f1, Charsets.UTF_8); + + ReliableTaildirEventReader reader = getReader(); + List<String> out = Lists.newArrayList(); + for (TailFile tf : reader.getTailFiles().values()) { + out.addAll(bodiesAsStrings(reader.readEvents(tf, 5))); + reader.commit(); + } + assertEquals(4, out.size()); + //Should treat \n as line boundary + assertTrue(out.contains("file1line1")); + //Should not treat \r as line boundary + assertTrue(out.contains("file1line2\rfile1line2")); + //Should treat \r\n as line boundary + assertTrue(out.contains("file1line3")); + assertTrue(out.contains("file1line4")); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java new file mode 100644 index 0000000..f9e614c --- /dev/null +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.flume.source.taildir; + +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*; +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.ReplicatingChannelSelector; +import org.apache.flume.conf.Configurables; +import org.apache.flume.lifecycle.LifecycleController; +import org.apache.flume.lifecycle.LifecycleState; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.io.Files; + +public class TestTaildirSource { + static TaildirSource source; + static MemoryChannel channel; + private File tmpDir; + private String posFilePath; + + @Before + public void setUp() { + source = new TaildirSource(); + channel = new MemoryChannel(); + + Configurables.configure(channel, new Context()); + + List<Channel> channels = new ArrayList<Channel>(); + channels.add(channel); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(channels); + + source.setChannelProcessor(new ChannelProcessor(rcs)); + tmpDir = Files.createTempDir(); + posFilePath = tmpDir.getAbsolutePath() + "/taildir_position_test.json"; + } + + @After + public void tearDown() { + for (File f : tmpDir.listFiles()) { + f.delete(); + } + tmpDir.delete(); + } + + @Test + public void testRegexFileNameFiltering() throws IOException { + File f1 = new File(tmpDir, "a.log"); + File f2 = new File(tmpDir, "a.log.1"); + File f3 = new File(tmpDir, "b.log"); + File f4 = new File(tmpDir, "c.log.yyyy-MM-01"); + File f5 = new File(tmpDir, "c.log.yyyy-MM-02"); + Files.write("a.log\n", f1, Charsets.UTF_8); + Files.write("a.log.1\n", f2, Charsets.UTF_8); + Files.write("b.log\n", f3, Charsets.UTF_8); + Files.write("c.log.yyyy-MM-01\n", f4, Charsets.UTF_8); + Files.write("c.log.yyyy-MM-02\n", f5, Charsets.UTF_8); + + Context context = new Context(); + context.put(POSITION_FILE, posFilePath); + context.put(FILE_GROUPS, "ab c"); + // Tail a.log and b.log + context.put(FILE_GROUPS_PREFIX + "ab", tmpDir.getAbsolutePath() + "/[ab].log"); + // Tail files that starts with c.log + context.put(FILE_GROUPS_PREFIX + "c", tmpDir.getAbsolutePath() + "/c.log.*"); + + Configurables.configure(source, context); + source.start(); + source.process(); + Transaction txn = channel.getTransaction(); + txn.begin(); + List<String> out = Lists.newArrayList(); + for (int i = 0; i < 5; i++) { + Event e = channel.take(); + if (e != null) { + out.add(TestTaildirEventReader.bodyAsString(e)); + } + } + txn.commit(); + txn.close(); + + assertEquals(4, out.size()); + // Make sure we got every file + assertTrue(out.contains("a.log")); + assertFalse(out.contains("a.log.1")); + assertTrue(out.contains("b.log")); + assertTrue(out.contains("c.log.yyyy-MM-01")); + assertTrue(out.contains("c.log.yyyy-MM-02")); + } + + @Test + public void testHeaderMapping() throws IOException { + File f1 = new File(tmpDir, "file1"); + File f2 = new File(tmpDir, "file2"); + File f3 = new File(tmpDir, "file3"); + Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8); + Files.write("file2line1\nfile2line2\n", f2, Charsets.UTF_8); + Files.write("file3line1\nfile3line2\n", f3, Charsets.UTF_8); + + Context context = new Context(); + context.put(POSITION_FILE, posFilePath); + context.put(FILE_GROUPS, "f1 f2 f3"); + context.put(FILE_GROUPS_PREFIX + "f1", tmpDir.getAbsolutePath() + "/file1$"); + context.put(FILE_GROUPS_PREFIX + "f2", tmpDir.getAbsolutePath() + "/file2$"); + context.put(FILE_GROUPS_PREFIX + "f3", tmpDir.getAbsolutePath() + "/file3$"); + context.put(HEADERS_PREFIX + "f1.headerKeyTest", "value1"); + context.put(HEADERS_PREFIX + "f2.headerKeyTest", "value2"); + context.put(HEADERS_PREFIX + "f2.headerKeyTest2", "value2-2"); + + Configurables.configure(source, context); + source.start(); + source.process(); + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int i = 0; i < 6; i++) { + Event e = channel.take(); + String body = new String(e.getBody(), Charsets.UTF_8); + String headerValue = e.getHeaders().get("headerKeyTest"); + String headerValue2 = e.getHeaders().get("headerKeyTest2"); + if (body.startsWith("file1")) { + assertEquals("value1", headerValue); + assertNull(headerValue2); + } else if (body.startsWith("file2")) { + assertEquals("value2", headerValue); + assertEquals("value2-2", headerValue2); + } else if (body.startsWith("file3")) { + // No header + assertNull(headerValue); + assertNull(headerValue2); + } + } + txn.commit(); + txn.close(); + } + + @Test + public void testLifecycle() throws IOException, InterruptedException { + File f1 = new File(tmpDir, "file1"); + Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8); + + Context context = new Context(); + context.put(POSITION_FILE, posFilePath); + context.put(FILE_GROUPS, "f1"); + context.put(FILE_GROUPS_PREFIX + "f1", tmpDir.getAbsolutePath() + "/file1$"); + Configurables.configure(source, context); + + for (int i = 0; i < 3; i++) { + source.start(); + source.process(); + assertTrue("Reached start or error", LifecycleController.waitForOneOf( + source, LifecycleState.START_OR_ERROR)); + assertEquals("Server is started", LifecycleState.START, + source.getLifecycleState()); + + source.stop(); + assertTrue("Reached stop or error", + LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR)); + assertEquals("Server is stopped", LifecycleState.STOP, + source.getLifecycleState()); + } + } + + @Test + public void testFileConsumeOrder() throws IOException { + System.out.println(tmpDir.toString()); + // 1) Create 1st file + File f1 = new File(tmpDir, "file1"); + String line1 = "file1line1\n"; + String line2 = "file1line2\n"; + String line3 = "file1line3\n"; + Files.write(line1 + line2 + line3, f1, Charsets.UTF_8); + try { + Thread.sleep(1000); // wait before creating a new file + } catch (InterruptedException e) { + } + + // 1) Create 2nd file + String line1b = "file2line1\n"; + String line2b = "file2line2\n"; + String line3b = "file2line3\n"; + File f2 = new File(tmpDir, "file2"); + Files.write(line1b + line2b + line3b, f2, Charsets.UTF_8); + try { + Thread.sleep(1000); // wait before creating next file + } catch (InterruptedException e) { + } + + // 3) Create 3rd file + String line1c = "file3line1\n"; + String line2c = "file3line2\n"; + String line3c = "file3line3\n"; + File f3 = new File(tmpDir, "file3"); + Files.write(line1c + line2c + line3c, f3, Charsets.UTF_8); + + try { + Thread.sleep(1000); // wait before creating a new file + } catch (InterruptedException e) { + } + + + // 4) Create 4th file + String line1d = "file4line1\n"; + String line2d = "file4line2\n"; + String line3d = "file4line3\n"; + File f4 = new File(tmpDir, "file4"); + Files.write(line1d + line2d + line3d, f4, Charsets.UTF_8); + + try { + Thread.sleep(1000); // wait before creating a new file + } catch (InterruptedException e) { + } + + + // 5) Now update the 3rd file so that its the latest file and gets consumed last + f3.setLastModified(System.currentTimeMillis()); + + // 4) Consume the files + ArrayList<String> consumedOrder = Lists.newArrayList(); + Context context = new Context(); + context.put(POSITION_FILE, posFilePath); + context.put(FILE_GROUPS, "g1"); + context.put(FILE_GROUPS_PREFIX + "g1", tmpDir.getAbsolutePath() + "/.*"); + + Configurables.configure(source, context); + source.start(); + source.process(); + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int i = 0; i < 12; i++) { + Event e = channel.take(); + String body = new String(e.getBody(), Charsets.UTF_8); + consumedOrder.add(body); + } + txn.commit(); + txn.close(); + + System.out.println(consumedOrder); + + // 6) Ensure consumption order is in order of last update time + ArrayList<String> expected = Lists.newArrayList(line1, line2, line3, // file1 + line1b, line2b, line3b, // file2 + line1d, line2d, line3d, // file4 + line1c, line2c, line3c // file3 + ); + for(int i =0; i!=expected.size(); ++i) { + expected.set(i, expected.get(i).trim() ); + } + assertArrayEquals("Files not consumed in expected order", expected.toArray(), consumedOrder.toArray()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/flume-ng-sources/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/pom.xml b/flume-ng-sources/pom.xml index 79de5fa..f526956 100644 --- a/flume-ng-sources/pom.xml +++ b/flume-ng-sources/pom.xml @@ -45,6 +45,7 @@ limitations under the License. <module>flume-jms-source</module> <module>flume-twitter-source</module> <module>flume-kafka-source</module> + <module>flume-taildir-source</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/d02013f4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 448c6ef..3b8ae53 100644 --- a/pom.xml +++ b/pom.xml @@ -1226,6 +1226,12 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-taildir-source</artifactId> + <version>1.7.0-SNAPSHOT</version> + </dependency> + + <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.7.0-SNAPSHOT</version>
