HADOOP-12759. RollingFileSystemSink should eagerly rotate directories. 
Contributed by Daniel Templeton.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5b59a0ea
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5b59a0ea
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5b59a0ea

Branch: refs/heads/yarn-2877
Commit: 5b59a0ea85c923384e36ad7c036e751551774142
Parents: 1495ff9
Author: Andrew Wang <w...@apache.org>
Authored: Sat Feb 6 20:52:35 2016 -0800
Committer: Andrew Wang <w...@apache.org>
Committed: Sat Feb 6 20:52:35 2016 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../metrics2/sink/RollingFileSystemSink.java    | 275 ++++++++++++++-----
 .../sink/RollingFileSystemSinkTestBase.java     |  27 +-
 3 files changed, 224 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b59a0ea/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 3492fdb..dbfa482 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1102,6 +1102,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12292. Make use of DeleteObjects optional.
     (Thomas Demoor via stevel)
 
+    HADOOP-12759. RollingFileSystemSink should eagerly rotate directories.
+    (Daniel Templeton via wang)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b59a0ea/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
index 8271362..de403ca 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.metrics2.sink;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.PrintStream;
@@ -25,8 +26,11 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
+import java.util.Calendar;
 import java.util.Date;
 import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.commons.lang.time.FastDateFormat;
@@ -35,7 +39,9 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.MetricsRecord;
@@ -43,7 +49,7 @@ import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
 
 /**
- * This class is a metrics sink that uses
+ * <p>This class is a metrics sink that uses
  * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs.  Every
  * hour a new directory will be created under the path specified by the
  * <code>basepath</code> property. All metrics will be logged to a file in the
@@ -54,51 +60,53 @@ import org.apache.hadoop.metrics2.MetricsTag;
  * time zone used to create the current hour's directory name is GMT.  If the
  * <code>basepath</code> property isn't specified, it will default to
  * &quot;/tmp&quot;, which is the temp directory on whatever default file
- * system is configured for the cluster.
+ * system is configured for the cluster.</p>
  *
- * The <code>&lt;prefix&gt;.sink.&lt;instance&gt;.ignore-error</code> property
- * controls whether an exception is thrown when an error is encountered writing
- * a log file.  The default value is <code>true</code>.  When set to
- * <code>false</code>, file errors are quietly swallowed.
+ * <p>The <code>&lt;prefix&gt;.sink.&lt;instance&gt;.ignore-error</code>
+ * property controls whether an exception is thrown when an error is 
encountered
+ * writing a log file.  The default value is <code>true</code>.  When set to
+ * <code>false</code>, file errors are quietly swallowed.</p>
  *
- * The primary use of this class is for logging to HDFS.  As it uses
+ * <p>The primary use of this class is for logging to HDFS.  As it uses
  * {@link org.apache.hadoop.fs.FileSystem} to access the target file system,
  * however, it can be used to write to the local file system, Amazon S3, or any
  * other supported file system.  The base path for the sink will determine the
  * file system used.  An unqualified path will write to the default file system
- * set by the configuration.
+ * set by the configuration.</p>
  *
- * Not all file systems support the ability to append to files.  In file 
systems
- * without the ability to append to files, only one writer can write to a file
- * at a time.  To allow for concurrent writes from multiple daemons on a single
- * host, the <code>source</code> property should be set to the name of the
- * source daemon, e.g. <i>namenode</i>.  The value of the <code>source</code>
- * property should typically be the same as the property's prefix.  If this
- * property is not set, the source is taken to be <i>unknown</i>.
+ * <p>Not all file systems support the ability to append to files.  In file
+ * systems without the ability to append to files, only one writer can write to
+ * a file at a time.  To allow for concurrent writes from multiple daemons on a
+ * single host, the <code>source</code> property should be set to the name of
+ * the source daemon, e.g. <i>namenode</i>.  The value of the
+ * <code>source</code> property should typically be the same as the property's
+ * prefix.  If this property is not set, the source is taken to be
+ * <i>unknown</i>.</p>
  *
- * Instead of appending to an existing file, by default the sink
+ * <p>Instead of appending to an existing file, by default the sink
  * will create a new file with a suffix of &quot;.&lt;n&gt;&quet;, where
  * <i>n</i> is the next lowest integer that isn't already used in a file name,
  * similar to the Hadoop daemon logs.  NOTE: the file with the <b>highest</b>
- * sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.
+ * sequence number is the <b>newest</b> file, unlike the Hadoop daemon 
logs.</p>
  *
- * For file systems that allow append, the sink supports appending to the
+ * <p>For file systems that allow append, the sink supports appending to the
  * existing file instead. If the <code>allow-append</code> property is set to
  * true, the sink will instead append to the existing file on file systems that
  * support appends. By default, the <code>allow-append</code> property is
- * false.
+ * false.</p>
  *
- * Note that when writing to HDFS with <code>allow-append</code> set to true,
+ * <p>Note that when writing to HDFS with <code>allow-append</code> set to 
true,
  * there is a minimum acceptable number of data nodes.  If the number of data
  * nodes drops below that minimum, the append will succeed, but reading the
  * data will fail with an IOException in the DataStreamer class.  The minimum
- * number of data nodes required for a successful append is generally 2 or 3.
+ * number of data nodes required for a successful append is generally 2 or
+ * 3.</p>
  *
- * Note also that when writing to HDFS, the file size information is not 
updated
- * until the file is closed (e.g. at the top of the hour) even though the data
- * is being written successfully. This is a known HDFS limitation that exists
- * because of the performance cost of updating the metadata.  See
- * <a href="https://issues.apache.org/jira/browse/HDFS-5478";>HDFS-5478</a>.
+ * <p>Note also that when writing to HDFS, the file size information is not
+ * updated until the file is closed (e.g. at the top of the hour) even though
+ * the data is being written successfully. This is a known HDFS limitation that
+ * exists because of the performance cost of updating the metadata.  See
+ * <a href="https://issues.apache.org/jira/browse/HDFS-5478";>HDFS-5478</a>.</p>
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -111,6 +119,7 @@ public class RollingFileSystemSink implements MetricsSink, 
Closeable {
   private static final String BASEPATH_DEFAULT = "/tmp";
   private static final FastDateFormat DATE_FORMAT =
       FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
+  private final Object lock = new Object();
   private String source;
   private boolean ignoreError;
   private boolean allowAppend;
@@ -124,6 +133,11 @@ public class RollingFileSystemSink implements MetricsSink, 
Closeable {
   private PrintStream currentOutStream;
   // We keep this only to be able to call hsynch() on it.
   private FSDataOutputStream currentFSOutStream;
+  private Timer flushTimer;
+  @VisibleForTesting
+  protected static boolean isTest = false;
+  @VisibleForTesting
+  protected static volatile boolean hasFlushed = false;
 
   @Override
   public void init(SubsetConfiguration conf) {
@@ -147,6 +161,8 @@ public class RollingFileSystemSink implements MetricsSink, 
Closeable {
     if (allowAppend) {
       allowAppend = checkAppend(fileSystem);
     }
+
+    flushTimer = new Timer("RollingFileSystemSink Flusher", true);
   }
 
   /**
@@ -175,28 +191,72 @@ public class RollingFileSystemSink implements 
MetricsSink, Closeable {
    * new directory or new log file
    */
   private void rollLogDirIfNeeded() throws MetricsException {
-    String currentDir = DATE_FORMAT.format(new Date());
+    Date now = new Date();
+    String currentDir = DATE_FORMAT.format(now);
     Path path = new Path(basePath, currentDir);
 
     // We check whether currentOutStream is null instead of currentDirPath,
     // because if currentDirPath is null, then currentOutStream is null, but
     // currentOutStream can be null for other reasons.
     if ((currentOutStream == null) || !path.equals(currentDirPath)) {
-      currentDirPath = path;
-
+      // Close the stream. This step could have been handled already by the
+      // flusher thread, but if it has, the PrintStream will just swallow the
+      // exception, which is fine.
       if (currentOutStream != null) {
         currentOutStream.close();
       }
 
+      currentDirPath = path;
+
       try {
         rollLogDir();
       } catch (IOException ex) {
-        throwMetricsException("Failed to creating new log file", ex);
+        throwMetricsException("Failed to create new log file", ex);
       }
+
+      scheduleFlush(now);
     }
   }
 
   /**
+   * Schedule the current hour's directory to be flushed at the top of the next
+   * hour. If this ends up running after the top of the next hour, it will
+   * execute immediately.
+   *
+   * @param now the current time
+   */
+  private void scheduleFlush(Date now) {
+    // Store the current currentDirPath to close later
+    final PrintStream toClose = currentOutStream;
+    Calendar next = Calendar.getInstance();
+
+    next.setTime(now);
+
+    if (isTest) {
+      // If we're running unit tests, flush after a short pause
+      next.add(Calendar.MILLISECOND, 400);
+    } else {
+      // Otherwise flush at the top of the hour
+      next.set(Calendar.SECOND, 0);
+      next.set(Calendar.MINUTE, 0);
+      next.add(Calendar.HOUR, 1);
+    }
+
+    flushTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        synchronized (lock) {
+          // This close may have already been done by a putMetrics() call. If 
it
+          // has, the PrintStream will swallow the exception, which is fine.
+          toClose.close();
+        }
+
+        hasFlushed = true;
+      }
+    }, next.getTime());
+  }
+
+  /**
    * Create a new directory based on the current hour and a new log file in
    * that directory.
    *
@@ -231,7 +291,9 @@ public class RollingFileSystemSink implements MetricsSink, 
Closeable {
    */
   private void createLogFile(Path initial) throws IOException {
     Path currentAttempt = initial;
-    int id = 1;
+    // Start at 0 so that if the base filname exists, we start with the suffix
+    // ".1".
+    int id = 0;
 
     while (true) {
       // First try blindly creating the file. If we fail, it either means
@@ -248,8 +310,8 @@ public class RollingFileSystemSink implements MetricsSink, 
Closeable {
       } catch (IOException ex) {
         // Now we can check to see if the file exists to know why we failed
         if (fileSystem.exists(currentAttempt)) {
+          id = getNextIdToTry(initial, id);
           currentAttempt = new Path(initial.toString() + "." + id);
-          id += 1;
         } else {
           throw ex;
         }
@@ -258,6 +320,66 @@ public class RollingFileSystemSink implements MetricsSink, 
Closeable {
   }
 
   /**
+   * Return the next ID suffix to use when creating the log file. This method
+   * will look at the files in the directory, find the one with the highest
+   * ID suffix, and 1 to that suffix, and return it. This approach saves a full
+   * linear probe, which matters in the case where there are a large number of
+   * log files.
+   *
+   * @param initial the base file path
+   * @param lastId the last ID value that was used
+   * @return the next ID to try
+   * @throws IOException thrown if there's an issue querying the files in the
+   * directory
+   */
+  private int getNextIdToTry(Path initial, int lastId)
+      throws IOException {
+    RemoteIterator<LocatedFileStatus> files =
+        fileSystem.listFiles(currentDirPath, true);
+    String base = initial.toString();
+    int id = lastId;
+
+    while (files.hasNext()) {
+      String file = files.next().getPath().getName();
+
+      if (file.startsWith(base)) {
+        int fileId = extractId(file);
+
+        if (fileId > id) {
+          id = fileId;
+        }
+      }
+    }
+
+    // Return either 1 more than the highest we found or 1 more than the last
+    // ID used (if no ID was found).
+    return id + 1;
+  }
+
+  /**
+   * Extract the ID from the suffix of the given file name.
+   *
+   * @param file the file name
+   * @return the ID or -1 if no ID could be extracted
+   */
+  private int extractId(String file) {
+    int index = file.lastIndexOf(".");
+    int id = -1;
+
+    // A hostname has to have at least 1 character
+    if (index > 0) {
+      try {
+        id = Integer.parseInt(file.substring(index + 1));
+      } catch (NumberFormatException ex) {
+        // This can happen if there's no suffix, but there is a dot in the
+        // hostname.  Just ignore it.
+      }
+    }
+
+    return id;
+  }
+
+  /**
    * Create a new log file and return the {@link FSDataOutputStream}. If a
    * file with the specified path already exists, open the file for append
    * instead.
@@ -303,65 +425,72 @@ public class RollingFileSystemSink implements 
MetricsSink, Closeable {
 
   @Override
   public void putMetrics(MetricsRecord record) {
-    rollLogDirIfNeeded();
+    synchronized (lock) {
+      rollLogDirIfNeeded();
 
-    if (currentOutStream != null) {
-      currentOutStream.printf("%d %s.%s", record.timestamp(),
-          record.context(), record.name());
+      if (currentOutStream != null) {
+        currentOutStream.printf("%d %s.%s", record.timestamp(),
+            record.context(), record.name());
 
-      String separator = ": ";
+        String separator = ": ";
 
-      for (MetricsTag tag : record.tags()) {
-        currentOutStream.printf("%s%s=%s", separator, tag.name(), tag.value());
-        separator = ", ";
-      }
+        for (MetricsTag tag : record.tags()) {
+          currentOutStream.printf("%s%s=%s", separator, tag.name(),
+              tag.value());
+          separator = ", ";
+        }
 
-      for (AbstractMetric metric : record.metrics()) {
-        currentOutStream.printf("%s%s=%s", separator, metric.name(),
-            metric.value());
-      }
+        for (AbstractMetric metric : record.metrics()) {
+          currentOutStream.printf("%s%s=%s", separator, metric.name(),
+              metric.value());
+        }
 
-      currentOutStream.println();
+        currentOutStream.println();
 
-      // If we don't hflush(), the data may not be written until the file is
-      // closed. The file won't be closed until the top of the hour *AND*
-      // another record is received. Calling hflush() makes sure that the data
-      // is complete at the top of the hour.
-      try {
-        currentFSOutStream.hflush();
-      } catch (IOException ex) {
-        throwMetricsException("Failed flushing the stream", ex);
-      }
+        // If we don't hflush(), the data may not be written until the file is
+        // closed. The file won't be closed until the top of the hour *AND*
+        // another record is received. Calling hflush() makes sure that the 
data
+        // is complete at the top of the hour.
+        try {
+          currentFSOutStream.hflush();
+        } catch (IOException ex) {
+          throwMetricsException("Failed flushing the stream", ex);
+        }
 
-      checkForErrors("Unable to write to log file");
-    } else if (!ignoreError) {
-      throwMetricsException("Unable to write to log file");
+        checkForErrors("Unable to write to log file");
+      } else if (!ignoreError) {
+        throwMetricsException("Unable to write to log file");
+      }
     }
   }
 
   @Override
   public void flush() {
-    // currentOutStream is null if currentFSOutStream is null
-    if (currentFSOutStream != null) {
-      try {
-        currentFSOutStream.hflush();
-      } catch (IOException ex) {
-        throwMetricsException("Unable to flush log file", ex);
+    synchronized (lock) {
+      // currentOutStream is null if currentFSOutStream is null
+      if (currentFSOutStream != null) {
+        try {
+          currentFSOutStream.hflush();
+        } catch (IOException ex) {
+          throwMetricsException("Unable to flush log file", ex);
+        }
       }
     }
   }
 
   @Override
-  public void close() throws IOException {
-    if (currentOutStream != null) {
-      currentOutStream.close();
+  public void close() {
+    synchronized (lock) {
+      if (currentOutStream != null) {
+        currentOutStream.close();
 
-      try {
-        checkForErrors("Unable to close log file");
-      } finally {
-        // Null out the streams just in case someone tries to reuse us.
-        currentOutStream = null;
-        currentFSOutStream = null;
+        try {
+          checkForErrors("Unable to close log file");
+        } finally {
+          // Null out the streams just in case someone tries to reuse us.
+          currentOutStream = null;
+          currentFSOutStream = null;
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b59a0ea/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
index 3213276..292d1fc 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
@@ -25,6 +25,7 @@ import java.io.InputStreamReader;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
@@ -214,8 +215,7 @@ public class RollingFileSystemSinkTestBase {
   protected String readLogFile(String path, String then, int count)
       throws IOException, URISyntaxException {
     final String now = DATE_FORMAT.format(new Date());
-    final String logFile =
-        "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log";
+    final String logFile = getLogFilename();
     FileSystem fs = FileSystem.get(new URI(path), new Configuration());
     StringBuilder metrics = new StringBuilder();
     boolean found = false;
@@ -258,7 +258,10 @@ public class RollingFileSystemSinkTestBase {
   }
 
   /**
-   * Return the path to the log file to use, based on the target path.
+   * Return the path to the log file to use, based on the initial path. The
+   * initial path must be a valid log file path. This method will find the
+   * most recent version of the file.
+   *
    * @param fs the target FileSystem
    * @param initial the path from which to start
    * @return the path to use
@@ -275,10 +278,20 @@ public class RollingFileSystemSinkTestBase {
       nextLogFile = new Path(initial.toString() + "." + id);
       id += 1;
     } while (fs.exists(nextLogFile));
+
     return logFile;
   }
 
   /**
+   * Return the name of the log file for this host.
+   *
+   * @return the name of the log file for this host
+   */
+  protected static String getLogFilename() throws UnknownHostException {
+    return "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log";
+  }
+
+  /**
    * Assert that the given contents match what is expected from the test
    * metrics.
    *
@@ -392,8 +405,7 @@ public class RollingFileSystemSinkTestBase {
 
     fs.mkdirs(dir);
 
-    Path file = new Path(dir,
-        "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log");
+    Path file = new Path(dir, getLogFilename());
 
     // Create the log file to force the sink to append
     try (FSDataOutputStream out = fs.create(file)) {
@@ -405,8 +417,7 @@ public class RollingFileSystemSinkTestBase {
       int count = 1;
 
       while (count < numFiles) {
-        file = new Path(dir, "testsrc-"
-            + InetAddress.getLocalHost().getHostName() + ".log." + count);
+        file = new Path(dir, getLogFilename() + "." + count);
 
         // Create the log file to force the sink to append
         try (FSDataOutputStream out = fs.create(file)) {
@@ -482,7 +493,7 @@ public class RollingFileSystemSinkTestBase {
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
       try {
         super.close();
       } catch (MetricsException ex) {

Reply via email to