HADOOP-12759. RollingFileSystemSink should eagerly rotate directories. Contributed by Daniel Templeton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5b59a0ea Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5b59a0ea Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5b59a0ea Branch: refs/heads/yarn-2877 Commit: 5b59a0ea85c923384e36ad7c036e751551774142 Parents: 1495ff9 Author: Andrew Wang <w...@apache.org> Authored: Sat Feb 6 20:52:35 2016 -0800 Committer: Andrew Wang <w...@apache.org> Committed: Sat Feb 6 20:52:35 2016 -0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../metrics2/sink/RollingFileSystemSink.java | 275 ++++++++++++++----- .../sink/RollingFileSystemSinkTestBase.java | 27 +- 3 files changed, 224 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b59a0ea/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 3492fdb..dbfa482 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -1102,6 +1102,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12292. Make use of DeleteObjects optional. (Thomas Demoor via stevel) + HADOOP-12759. RollingFileSystemSink should eagerly rotate directories. + (Daniel Templeton via wang) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b59a0ea/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java index 8271362..de403ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java @@ -18,6 +18,7 @@ package org.apache.hadoop.metrics2.sink; +import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; import java.io.IOException; import java.io.PrintStream; @@ -25,8 +26,11 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.Calendar; import java.util.Date; import java.util.TimeZone; +import java.util.Timer; +import java.util.TimerTask; import org.apache.commons.configuration.SubsetConfiguration; import org.apache.commons.lang.time.FastDateFormat; @@ -35,7 +39,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.MetricsRecord; @@ -43,7 +49,7 @@ import org.apache.hadoop.metrics2.MetricsSink; import org.apache.hadoop.metrics2.MetricsTag; /** - * This class is a metrics sink that uses + * <p>This class is a metrics sink that uses * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs. Every * hour a new directory will be created under the path specified by the * <code>basepath</code> property. All metrics will be logged to a file in the @@ -54,51 +60,53 @@ import org.apache.hadoop.metrics2.MetricsTag; * time zone used to create the current hour's directory name is GMT. If the * <code>basepath</code> property isn't specified, it will default to * "/tmp", which is the temp directory on whatever default file - * system is configured for the cluster. + * system is configured for the cluster.</p> * - * The <code><prefix>.sink.<instance>.ignore-error</code> property - * controls whether an exception is thrown when an error is encountered writing - * a log file. The default value is <code>true</code>. When set to - * <code>false</code>, file errors are quietly swallowed. + * <p>The <code><prefix>.sink.<instance>.ignore-error</code> + * property controls whether an exception is thrown when an error is encountered + * writing a log file. The default value is <code>true</code>. When set to + * <code>false</code>, file errors are quietly swallowed.</p> * - * The primary use of this class is for logging to HDFS. As it uses + * <p>The primary use of this class is for logging to HDFS. As it uses * {@link org.apache.hadoop.fs.FileSystem} to access the target file system, * however, it can be used to write to the local file system, Amazon S3, or any * other supported file system. The base path for the sink will determine the * file system used. An unqualified path will write to the default file system - * set by the configuration. + * set by the configuration.</p> * - * Not all file systems support the ability to append to files. In file systems - * without the ability to append to files, only one writer can write to a file - * at a time. To allow for concurrent writes from multiple daemons on a single - * host, the <code>source</code> property should be set to the name of the - * source daemon, e.g. <i>namenode</i>. The value of the <code>source</code> - * property should typically be the same as the property's prefix. If this - * property is not set, the source is taken to be <i>unknown</i>. + * <p>Not all file systems support the ability to append to files. In file + * systems without the ability to append to files, only one writer can write to + * a file at a time. To allow for concurrent writes from multiple daemons on a + * single host, the <code>source</code> property should be set to the name of + * the source daemon, e.g. <i>namenode</i>. The value of the + * <code>source</code> property should typically be the same as the property's + * prefix. If this property is not set, the source is taken to be + * <i>unknown</i>.</p> * - * Instead of appending to an existing file, by default the sink + * <p>Instead of appending to an existing file, by default the sink * will create a new file with a suffix of ".<n>&quet;, where * <i>n</i> is the next lowest integer that isn't already used in a file name, * similar to the Hadoop daemon logs. NOTE: the file with the <b>highest</b> - * sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs. + * sequence number is the <b>newest</b> file, unlike the Hadoop daemon logs.</p> * - * For file systems that allow append, the sink supports appending to the + * <p>For file systems that allow append, the sink supports appending to the * existing file instead. If the <code>allow-append</code> property is set to * true, the sink will instead append to the existing file on file systems that * support appends. By default, the <code>allow-append</code> property is - * false. + * false.</p> * - * Note that when writing to HDFS with <code>allow-append</code> set to true, + * <p>Note that when writing to HDFS with <code>allow-append</code> set to true, * there is a minimum acceptable number of data nodes. If the number of data * nodes drops below that minimum, the append will succeed, but reading the * data will fail with an IOException in the DataStreamer class. The minimum - * number of data nodes required for a successful append is generally 2 or 3. + * number of data nodes required for a successful append is generally 2 or + * 3.</p> * - * Note also that when writing to HDFS, the file size information is not updated - * until the file is closed (e.g. at the top of the hour) even though the data - * is being written successfully. This is a known HDFS limitation that exists - * because of the performance cost of updating the metadata. See - * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>. + * <p>Note also that when writing to HDFS, the file size information is not + * updated until the file is closed (e.g. at the top of the hour) even though + * the data is being written successfully. This is a known HDFS limitation that + * exists because of the performance cost of updating the metadata. See + * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p> */ @InterfaceAudience.Public @InterfaceStability.Evolving @@ -111,6 +119,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { private static final String BASEPATH_DEFAULT = "/tmp"; private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT")); + private final Object lock = new Object(); private String source; private boolean ignoreError; private boolean allowAppend; @@ -124,6 +133,11 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { private PrintStream currentOutStream; // We keep this only to be able to call hsynch() on it. private FSDataOutputStream currentFSOutStream; + private Timer flushTimer; + @VisibleForTesting + protected static boolean isTest = false; + @VisibleForTesting + protected static volatile boolean hasFlushed = false; @Override public void init(SubsetConfiguration conf) { @@ -147,6 +161,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { if (allowAppend) { allowAppend = checkAppend(fileSystem); } + + flushTimer = new Timer("RollingFileSystemSink Flusher", true); } /** @@ -175,28 +191,72 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { * new directory or new log file */ private void rollLogDirIfNeeded() throws MetricsException { - String currentDir = DATE_FORMAT.format(new Date()); + Date now = new Date(); + String currentDir = DATE_FORMAT.format(now); Path path = new Path(basePath, currentDir); // We check whether currentOutStream is null instead of currentDirPath, // because if currentDirPath is null, then currentOutStream is null, but // currentOutStream can be null for other reasons. if ((currentOutStream == null) || !path.equals(currentDirPath)) { - currentDirPath = path; - + // Close the stream. This step could have been handled already by the + // flusher thread, but if it has, the PrintStream will just swallow the + // exception, which is fine. if (currentOutStream != null) { currentOutStream.close(); } + currentDirPath = path; + try { rollLogDir(); } catch (IOException ex) { - throwMetricsException("Failed to creating new log file", ex); + throwMetricsException("Failed to create new log file", ex); } + + scheduleFlush(now); } } /** + * Schedule the current hour's directory to be flushed at the top of the next + * hour. If this ends up running after the top of the next hour, it will + * execute immediately. + * + * @param now the current time + */ + private void scheduleFlush(Date now) { + // Store the current currentDirPath to close later + final PrintStream toClose = currentOutStream; + Calendar next = Calendar.getInstance(); + + next.setTime(now); + + if (isTest) { + // If we're running unit tests, flush after a short pause + next.add(Calendar.MILLISECOND, 400); + } else { + // Otherwise flush at the top of the hour + next.set(Calendar.SECOND, 0); + next.set(Calendar.MINUTE, 0); + next.add(Calendar.HOUR, 1); + } + + flushTimer.schedule(new TimerTask() { + @Override + public void run() { + synchronized (lock) { + // This close may have already been done by a putMetrics() call. If it + // has, the PrintStream will swallow the exception, which is fine. + toClose.close(); + } + + hasFlushed = true; + } + }, next.getTime()); + } + + /** * Create a new directory based on the current hour and a new log file in * that directory. * @@ -231,7 +291,9 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { */ private void createLogFile(Path initial) throws IOException { Path currentAttempt = initial; - int id = 1; + // Start at 0 so that if the base filname exists, we start with the suffix + // ".1". + int id = 0; while (true) { // First try blindly creating the file. If we fail, it either means @@ -248,8 +310,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { } catch (IOException ex) { // Now we can check to see if the file exists to know why we failed if (fileSystem.exists(currentAttempt)) { + id = getNextIdToTry(initial, id); currentAttempt = new Path(initial.toString() + "." + id); - id += 1; } else { throw ex; } @@ -258,6 +320,66 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { } /** + * Return the next ID suffix to use when creating the log file. This method + * will look at the files in the directory, find the one with the highest + * ID suffix, and 1 to that suffix, and return it. This approach saves a full + * linear probe, which matters in the case where there are a large number of + * log files. + * + * @param initial the base file path + * @param lastId the last ID value that was used + * @return the next ID to try + * @throws IOException thrown if there's an issue querying the files in the + * directory + */ + private int getNextIdToTry(Path initial, int lastId) + throws IOException { + RemoteIterator<LocatedFileStatus> files = + fileSystem.listFiles(currentDirPath, true); + String base = initial.toString(); + int id = lastId; + + while (files.hasNext()) { + String file = files.next().getPath().getName(); + + if (file.startsWith(base)) { + int fileId = extractId(file); + + if (fileId > id) { + id = fileId; + } + } + } + + // Return either 1 more than the highest we found or 1 more than the last + // ID used (if no ID was found). + return id + 1; + } + + /** + * Extract the ID from the suffix of the given file name. + * + * @param file the file name + * @return the ID or -1 if no ID could be extracted + */ + private int extractId(String file) { + int index = file.lastIndexOf("."); + int id = -1; + + // A hostname has to have at least 1 character + if (index > 0) { + try { + id = Integer.parseInt(file.substring(index + 1)); + } catch (NumberFormatException ex) { + // This can happen if there's no suffix, but there is a dot in the + // hostname. Just ignore it. + } + } + + return id; + } + + /** * Create a new log file and return the {@link FSDataOutputStream}. If a * file with the specified path already exists, open the file for append * instead. @@ -303,65 +425,72 @@ public class RollingFileSystemSink implements MetricsSink, Closeable { @Override public void putMetrics(MetricsRecord record) { - rollLogDirIfNeeded(); + synchronized (lock) { + rollLogDirIfNeeded(); - if (currentOutStream != null) { - currentOutStream.printf("%d %s.%s", record.timestamp(), - record.context(), record.name()); + if (currentOutStream != null) { + currentOutStream.printf("%d %s.%s", record.timestamp(), + record.context(), record.name()); - String separator = ": "; + String separator = ": "; - for (MetricsTag tag : record.tags()) { - currentOutStream.printf("%s%s=%s", separator, tag.name(), tag.value()); - separator = ", "; - } + for (MetricsTag tag : record.tags()) { + currentOutStream.printf("%s%s=%s", separator, tag.name(), + tag.value()); + separator = ", "; + } - for (AbstractMetric metric : record.metrics()) { - currentOutStream.printf("%s%s=%s", separator, metric.name(), - metric.value()); - } + for (AbstractMetric metric : record.metrics()) { + currentOutStream.printf("%s%s=%s", separator, metric.name(), + metric.value()); + } - currentOutStream.println(); + currentOutStream.println(); - // If we don't hflush(), the data may not be written until the file is - // closed. The file won't be closed until the top of the hour *AND* - // another record is received. Calling hflush() makes sure that the data - // is complete at the top of the hour. - try { - currentFSOutStream.hflush(); - } catch (IOException ex) { - throwMetricsException("Failed flushing the stream", ex); - } + // If we don't hflush(), the data may not be written until the file is + // closed. The file won't be closed until the top of the hour *AND* + // another record is received. Calling hflush() makes sure that the data + // is complete at the top of the hour. + try { + currentFSOutStream.hflush(); + } catch (IOException ex) { + throwMetricsException("Failed flushing the stream", ex); + } - checkForErrors("Unable to write to log file"); - } else if (!ignoreError) { - throwMetricsException("Unable to write to log file"); + checkForErrors("Unable to write to log file"); + } else if (!ignoreError) { + throwMetricsException("Unable to write to log file"); + } } } @Override public void flush() { - // currentOutStream is null if currentFSOutStream is null - if (currentFSOutStream != null) { - try { - currentFSOutStream.hflush(); - } catch (IOException ex) { - throwMetricsException("Unable to flush log file", ex); + synchronized (lock) { + // currentOutStream is null if currentFSOutStream is null + if (currentFSOutStream != null) { + try { + currentFSOutStream.hflush(); + } catch (IOException ex) { + throwMetricsException("Unable to flush log file", ex); + } } } } @Override - public void close() throws IOException { - if (currentOutStream != null) { - currentOutStream.close(); + public void close() { + synchronized (lock) { + if (currentOutStream != null) { + currentOutStream.close(); - try { - checkForErrors("Unable to close log file"); - } finally { - // Null out the streams just in case someone tries to reuse us. - currentOutStream = null; - currentFSOutStream = null; + try { + checkForErrors("Unable to close log file"); + } finally { + // Null out the streams just in case someone tries to reuse us. + currentOutStream = null; + currentFSOutStream = null; + } } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b59a0ea/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java index 3213276..292d1fc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java @@ -25,6 +25,7 @@ import java.io.InputStreamReader; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; +import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.Calendar; @@ -214,8 +215,7 @@ public class RollingFileSystemSinkTestBase { protected String readLogFile(String path, String then, int count) throws IOException, URISyntaxException { final String now = DATE_FORMAT.format(new Date()); - final String logFile = - "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log"; + final String logFile = getLogFilename(); FileSystem fs = FileSystem.get(new URI(path), new Configuration()); StringBuilder metrics = new StringBuilder(); boolean found = false; @@ -258,7 +258,10 @@ public class RollingFileSystemSinkTestBase { } /** - * Return the path to the log file to use, based on the target path. + * Return the path to the log file to use, based on the initial path. The + * initial path must be a valid log file path. This method will find the + * most recent version of the file. + * * @param fs the target FileSystem * @param initial the path from which to start * @return the path to use @@ -275,10 +278,20 @@ public class RollingFileSystemSinkTestBase { nextLogFile = new Path(initial.toString() + "." + id); id += 1; } while (fs.exists(nextLogFile)); + return logFile; } /** + * Return the name of the log file for this host. + * + * @return the name of the log file for this host + */ + protected static String getLogFilename() throws UnknownHostException { + return "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log"; + } + + /** * Assert that the given contents match what is expected from the test * metrics. * @@ -392,8 +405,7 @@ public class RollingFileSystemSinkTestBase { fs.mkdirs(dir); - Path file = new Path(dir, - "testsrc-" + InetAddress.getLocalHost().getHostName() + ".log"); + Path file = new Path(dir, getLogFilename()); // Create the log file to force the sink to append try (FSDataOutputStream out = fs.create(file)) { @@ -405,8 +417,7 @@ public class RollingFileSystemSinkTestBase { int count = 1; while (count < numFiles) { - file = new Path(dir, "testsrc-" - + InetAddress.getLocalHost().getHostName() + ".log." + count); + file = new Path(dir, getLogFilename() + "." + count); // Create the log file to force the sink to append try (FSDataOutputStream out = fs.create(file)) { @@ -482,7 +493,7 @@ public class RollingFileSystemSinkTestBase { } @Override - public void close() throws IOException { + public void close() { try { super.close(); } catch (MetricsException ex) {