AMBARI-21387 Log Feeder input config attribute "tail" should be clearer (mgergely)
Change-Id: I0ca164df6b5b91d237f1503bc4b9e45a0df4b685 Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8d9fd451 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8d9fd451 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8d9fd451 Branch: refs/heads/branch-feature-AMBARI-14714 Commit: 8d9fd451d5ad348d073d4adbe73970586ab71c37 Parents: 78ebbef Author: Miklos Gergely <mgerg...@hortonworks.com> Authored: Thu Jul 6 10:17:52 2017 +0200 Committer: Miklos Gergely <mgerg...@hortonworks.com> Committed: Thu Jul 6 10:17:52 2017 +0200 ---------------------------------------------------------------------- .../config/zookeeper/LogSearchConfigZK.java | 12 +- .../logfeeder/input/AbstractInputFile.java | 105 +++++++++-------- .../apache/ambari/logfeeder/input/Input.java | 4 - .../ambari/logfeeder/input/InputFile.java | 23 ++-- .../ambari/logfeeder/input/InputManager.java | 113 +++++++++++-------- .../ambari/logfeeder/input/InputS3File.java | 11 +- .../logfeeder/input/InputManagerTest.java | 1 - 7 files changed, 152 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java index 1926efa..827101c 100644 --- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java @@ -22,6 +22,7 @@ package org.apache.ambari.logsearch.config.zookeeper; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.ambari.logsearch.config.api.LogSearchConfig; @@ -53,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableSet; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; @@ -175,10 +177,14 @@ public class LogSearchConfigZK implements LogSearchConfig { globalConfigNode.add(globalConfigJson.getAsJsonObject().get("global")); } - createGlobalConfigNode(globalConfigNode); - TreeCacheListener listener = new TreeCacheListener() { + private final Set<Type> nodeEvents = ImmutableSet.of(Type.NODE_ADDED, Type.NODE_UPDATED, Type.NODE_REMOVED); + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { + if (!nodeEvents.contains(event.getType())) { + return; + } + String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath()); String nodeData = new String(event.getData().getData()); Type eventType = event.getType(); @@ -265,6 +271,8 @@ public class LogSearchConfigZK implements LogSearchConfig { }; cache.getListenable().addListener(listener); cache.start(); + + createGlobalConfigNode(globalConfigNode); } private void createGlobalConfigNode(JsonArray globalConfigNode) { http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java index ab50eb7..2359256 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java @@ -31,7 +31,6 @@ import java.util.Map; import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor; -import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang.ObjectUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -48,11 +47,12 @@ public abstract class AbstractInputFile extends Input { protected boolean isReady; private String checkPointExtension; - private File checkPointFile; - private long lastCheckPointTimeMS; private int checkPointIntervalMS; - private Map<String, Object> jsonCheckPoint; - private InputMarker lastCheckPointInputMarker; + + private Map<String, File> checkPointFiles = new HashMap<>(); + private Map<String, Long> lastCheckPointTimeMSs = new HashMap<>(); + private Map<String, Map<String, Object>> jsonCheckPoints = new HashMap<>(); + private Map<String, InputMarker> lastCheckPointInputMarkers = new HashMap<>(); @Override protected String getStatMetricName() { @@ -73,7 +73,6 @@ public abstract class AbstractInputFile extends Input { // Let's close the file and set it to true after we start monitoring it setClosed(true); logPath = inputDescriptor.getPath(); - tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), tail); checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)inputDescriptor).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS); if (StringUtils.isEmpty(logPath)) { @@ -89,11 +88,9 @@ public abstract class AbstractInputFile extends Input { super.init(); } - protected void processFile(File logPathFile) throws FileNotFoundException, IOException { + protected void processFile(File logPathFile, boolean follow) throws FileNotFoundException, IOException { LOG.info("Monitoring logPath=" + logPath + ", logPathFile=" + logPathFile); BufferedReader br = null; - checkPointFile = null; - jsonCheckPoint = null; int lineCount = 0; try { @@ -125,7 +122,7 @@ public abstract class AbstractInputFile extends Input { sleepIteration++; if (sleepIteration == 2) { flush(); - if (!tail) { + if (!follow) { LOG.info("End of file. Done with filePath=" + logPathFile.getAbsolutePath() + ", lineCount=" + lineCount); break; } @@ -204,47 +201,50 @@ public abstract class AbstractInputFile extends Input { private int getResumeFromLineNumber() { int resumeFromLineNumber = 0; - if (tail) { - try { - LOG.info("Checking existing checkpoint file. " + getShortDescription()); + File checkPointFile = null; + try { + LOG.info("Checking existing checkpoint file. " + getShortDescription()); - String checkPointFileName = base64FileKey + checkPointExtension; - File checkPointFolder = inputManager.getCheckPointFolderFile(); - checkPointFile = new File(checkPointFolder, checkPointFileName); - if (!checkPointFile.exists()) { - LOG.info("Checkpoint file for log file " + filePath + " doesn't exist, starting to read it from the beginning"); - } else { - try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) { - int contentSize = checkPointWriter.readInt(); - byte b[] = new byte[contentSize]; - int readSize = checkPointWriter.read(b, 0, contentSize); - if (readSize != contentSize) { - LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" + - readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription()); - } else { - String jsonCheckPointStr = new String(b, 0, readSize); - jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr); + String checkPointFileName = base64FileKey + checkPointExtension; + File checkPointFolder = inputManager.getCheckPointFolderFile(); + checkPointFile = new File(checkPointFolder, checkPointFileName); + checkPointFiles.put(base64FileKey, checkPointFile); + Map<String, Object> jsonCheckPoint = null; + if (!checkPointFile.exists()) { + LOG.info("Checkpoint file for log file " + filePath + " doesn't exist, starting to read it from the beginning"); + } else { + try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) { + int contentSize = checkPointWriter.readInt(); + byte b[] = new byte[contentSize]; + int readSize = checkPointWriter.read(b, 0, contentSize); + if (readSize != contentSize) { + LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" + + readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription()); + } else { + String jsonCheckPointStr = new String(b, 0, readSize); + jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr); - resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number"); + resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number"); - LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr + - ", resumeFromLineNumber=" + resumeFromLineNumber); - } - } catch (EOFException eofEx) { - LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " + - getShortDescription(), eofEx); - } - } - if (jsonCheckPoint == null) { - // This seems to be first time, so creating the initial checkPoint object - jsonCheckPoint = new HashMap<String, Object>(); - jsonCheckPoint.put("file_path", filePath); - jsonCheckPoint.put("file_key", base64FileKey); + LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr + + ", resumeFromLineNumber=" + resumeFromLineNumber); + } + } catch (EOFException eofEx) { + LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " + + getShortDescription(), eofEx); } - - } catch (Throwable t) { - LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t); } + if (jsonCheckPoint == null) { + // This seems to be first time, so creating the initial checkPoint object + jsonCheckPoint = new HashMap<String, Object>(); + jsonCheckPoint.put("file_path", filePath); + jsonCheckPoint.put("file_key", base64FileKey); + } + + jsonCheckPoints.put(base64FileKey, jsonCheckPoint); + + } catch (Throwable t) { + LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t); } return resumeFromLineNumber; @@ -253,6 +253,9 @@ public abstract class AbstractInputFile extends Input { @Override public synchronized void checkIn(InputMarker inputMarker) { try { + Map<String, Object> jsonCheckPoint = jsonCheckPoints.get(inputMarker.base64FileKey); + File checkPointFile = checkPointFiles.get(inputMarker.base64FileKey); + int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number"); if (lineNumber > inputMarker.lineNumber) { // Already wrote higher line number for this input @@ -260,12 +263,14 @@ public abstract class AbstractInputFile extends Input { } // If interval is greater than last checkPoint time, then write long currMS = System.currentTimeMillis(); - if (!isClosed() && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) { + long lastCheckPointTimeMs = lastCheckPointTimeMSs.containsKey(inputMarker.base64FileKey) ? + lastCheckPointTimeMSs.get(inputMarker.base64FileKey) : 0; + if (!isClosed() && (currMS - lastCheckPointTimeMs < checkPointIntervalMS)) { // Let's save this one so we can update the check point file on flush - lastCheckPointInputMarker = inputMarker; + lastCheckPointInputMarkers.put(inputMarker.base64FileKey, inputMarker); return; } - lastCheckPointTimeMS = currMS; + lastCheckPointTimeMSs.put(inputMarker.base64FileKey, currMS); jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber)); jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS)); @@ -299,7 +304,7 @@ public abstract class AbstractInputFile extends Input { @Override public void lastCheckIn() { - if (lastCheckPointInputMarker != null) { + for (InputMarker lastCheckPointInputMarker : lastCheckPointInputMarkers.values()) { checkIn(lastCheckPointInputMarker); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index c36f96b..49151e7 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -270,10 +270,6 @@ public abstract class Input extends ConfigItem implements Runnable { } } - public boolean isTail() { - return tail; - } - public boolean isUseEventMD5() { return useEventMD5; } http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java index fc40ca4..e24a7aa 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java @@ -22,6 +22,8 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; +import java.util.Arrays; +import java.util.Comparator; import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory; import org.apache.ambari.logfeeder.util.FileUtil; @@ -41,7 +43,7 @@ public class InputFile extends AbstractInputFile { if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) { if (tail && logFiles.length > 1) { LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath + - ". Will use only the first one. Using " + logFiles[0].getAbsolutePath()); + ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath()); } LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath()); isReady = true; @@ -58,7 +60,15 @@ public class InputFile extends AbstractInputFile { return new File[]{searchFile}; } else { FileFilter fileFilter = new WildcardFileFilter(searchFile.getName()); - return searchFile.getParentFile().listFiles(fileFilter); + File[] logFiles = searchFile.getParentFile().listFiles(fileFilter); + Arrays.sort(logFiles, + new Comparator<File>() { + @Override + public int compare(File o1, File o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + return logFiles; } } @@ -66,12 +76,11 @@ public class InputFile extends AbstractInputFile { void start() throws Exception { boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getProcessFile(), true); if (isProcessFile) { - if (tail) { - processFile(logFiles[0]); - } else { - for (File file : logFiles) { + for (int i = logFiles.length - 1; i >= 0; i--) { + File file = logFiles[i]; + if (i == 0 || !tail) { try { - processFile(file); + processFile(file, i == 0); if (isClosed() || isDrain()) { LOG.info("isClosed or isDrain. Now breaking loop."); break; http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java index 19894ae..01a11ec 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java @@ -197,13 +197,9 @@ public class InputManager { if (input.isReady()) { input.monitor(); } else { - if (input.isTail()) { - LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " + - "So it might not be an issue. " + input.getShortDescription()); - notReadyList.add(input); - } else { - LOG.info("Input is not ready, so going to ignore it " + input.getShortDescription()); - } + LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " + + "So it might not be an issue. " + input.getShortDescription()); + notReadyList.add(input); } } catch (Exception e) { LOG.error("Error initializing input. " + input.getShortDescription(), e); @@ -279,46 +275,8 @@ public class InputManager { File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter); int totalCheckFilesDeleted = 0; for (File checkPointFile : checkPointFiles) { - try (RandomAccessFile checkPointReader = new RandomAccessFile(checkPointFile, "r")) { - int contentSize = checkPointReader.readInt(); - byte b[] = new byte[contentSize]; - int readSize = checkPointReader.read(b, 0, contentSize); - if (readSize != contentSize) { - LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" - + readSize + ", checkPointFile=" + checkPointFile); - } else { - String jsonCheckPointStr = new String(b, 0, readSize); - Map<String, Object> jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr); - - String logFilePath = (String) jsonCheckPoint.get("file_path"); - String logFileKey = (String) jsonCheckPoint.get("file_key"); - if (logFilePath != null && logFileKey != null) { - boolean deleteCheckPointFile = false; - File logFile = new File(logFilePath); - if (logFile.exists()) { - Object fileKeyObj = FileUtil.getFileKey(logFile); - String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes()); - if (!logFileKey.equals(fileBase64)) { - deleteCheckPointFile = true; - LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" + - logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath()); - } - } else { - LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" + - checkPointFile.getAbsolutePath()); - deleteCheckPointFile = true; - } - if (deleteCheckPointFile) { - LOG.info("Deleting CheckPoint file=" + checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath); - checkPointFile.delete(); - totalCheckFilesDeleted++; - } - } - } - } catch (EOFException eof) { - LOG.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + checkPointFile); - } catch (Throwable t) { - LOG.error("Error while checking checkPoint file. " + checkPointFile, t); + if (checkCheckPointFile(checkPointFile)) { + totalCheckFilesDeleted++; } } LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s). checkPointFolderFile=" + @@ -329,6 +287,67 @@ public class InputManager { } } + private boolean checkCheckPointFile(File checkPointFile) { + boolean deleted = false; + try (RandomAccessFile checkPointReader = new RandomAccessFile(checkPointFile, "r")) { + int contentSize = checkPointReader.readInt(); + byte b[] = new byte[contentSize]; + int readSize = checkPointReader.read(b, 0, contentSize); + if (readSize != contentSize) { + LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" + + readSize + ", checkPointFile=" + checkPointFile); + } else { + String jsonCheckPointStr = new String(b, 0, readSize); + Map<String, Object> jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr); + + String logFilePath = (String) jsonCheckPoint.get("file_path"); + String logFileKey = (String) jsonCheckPoint.get("file_key"); + if (logFilePath != null && logFileKey != null) { + boolean deleteCheckPointFile = false; + File logFile = new File(logFilePath); + if (logFile.exists()) { + Object fileKeyObj = FileUtil.getFileKey(logFile); + String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes()); + if (!logFileKey.equals(fileBase64)) { + LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" + + logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath()); + deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), logFileKey); + } + } else { + LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" + + checkPointFile.getAbsolutePath()); + deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), logFileKey); + } + if (deleteCheckPointFile) { + LOG.info("Deleting CheckPoint file=" + checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath); + checkPointFile.delete(); + deleted = true; + } + } + } + } catch (EOFException eof) { + LOG.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + checkPointFile); + } catch (Throwable t) { + LOG.error("Error while checking checkPoint file. " + checkPointFile, t); + } + + return deleted; + } + + private boolean wasFileRenamed(File folder, String searchFileBase64) { + for (File file : folder.listFiles()) { + Object fileKeyObj = FileUtil.getFileKey(file); + String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes()); + if (searchFileBase64.equals(fileBase64)) { + // even though the file name in the checkpoint file is different from the one it was renamed to, checkpoint files are + // identified by their name, which is generated from the file key, which would be the same for the renamed file + LOG.info("CheckPoint clean: File key matches file " + file.getAbsolutePath() + ", it must have been renamed"); + return true; + } + } + return false; + } + public void waitOnAllInputs() { //wait on inputs for (List<Input> inputList : inputs.values()) { http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java index 2b19503..69d053a 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java @@ -59,18 +59,17 @@ public class InputS3File extends AbstractInputFile { return; } - if (tail) { - processFile(logFiles[0]); - } else { - for (File s3FilePath : logFiles) { + for (int i = logFiles.length - 1; i >= 0; i--) { + File file = logFiles[i]; + if (i == 0 || !tail) { try { - processFile(s3FilePath); + processFile(file, i == 0); if (isClosed() || isDrain()) { LOG.info("isClosed or isDrain. Now breaking loop."); break; } } catch (Throwable t) { - LOG.error("Error processing file=" + s3FilePath, t); + LOG.error("Error processing file=" + file.getAbsolutePath(), t); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java index e9bbe7e..625e362 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java @@ -73,7 +73,6 @@ public class InputManagerTest { expect(input1.monitor()).andReturn(false); expect(input2.monitor()).andReturn(false); - expect(input3.isTail()).andReturn(false); expect(input3.getShortDescription()).andReturn("").once(); replay(input1, input2, input3);