YARN-3949. Ensure timely flush of timeline writes. Contributed by Sangjin Lee.
(cherry picked from commit 967bef7e0396d857913caa2574afb103a5f0b81b) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/54eded54 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/54eded54 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/54eded54 Branch: refs/heads/YARN-2928-rebase Commit: 54eded5457133954c45a6c33032e63ff80eedd51 Parents: f2e9dcf Author: Junping Du <junping...@apache.org> Authored: Sat Jul 25 10:30:29 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Mon Nov 9 16:13:09 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 9 +++ .../src/main/resources/yarn-default.xml | 17 ++++- .../collector/TimelineCollectorManager.java | 65 ++++++++++++++++++-- .../storage/FileSystemTimelineWriterImpl.java | 5 ++ .../storage/HBaseTimelineWriterImpl.java | 6 ++ .../storage/PhoenixTimelineWriterImpl.java | 5 ++ .../timelineservice/storage/TimelineWriter.java | 9 +++ .../TestNMTimelineCollectorManager.java | 5 ++ 9 files changed, 119 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/54eded54/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 53865dd..beba00e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -79,6 +79,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3047. [Data Serving] Set up ATS reader with basic request serving structure and lifecycle (Varun Saxena via sjlee) + YARN-3949. Ensure timely flush of timeline writes. (Sangjin Lee via + junping_du) + IMPROVEMENTS YARN-3276. Code cleanup for timeline service API records. (Junping Du via http://git-wip-us.apache.org/repos/asf/hadoop/blob/54eded54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index b38f86f..4190a62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1593,6 +1593,15 @@ public class YarnConfiguration extends Configuration { public static final String TIMELINE_SERVICE_READER_CLASS = TIMELINE_SERVICE_PREFIX + "reader.class"; + /** The setting that controls how often the timeline collector flushes the + * timeline writer. + */ + public static final String TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = + TIMELINE_SERVICE_PREFIX + "writer.flush-interval-seconds"; + + public static final int + DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private http://git-wip-us.apache.org/repos/asf/hadoop/blob/54eded54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index ef8c9d4..adc28f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -782,7 +782,15 @@ <name>yarn.system-metrics-publisher.enabled</name> <value>false</value> </property> - + + <property> + <description>The setting that controls whether yarn container metrics is + published to the timeline server or not by RM. This configuration setting is + for ATS V2.</description> + <name>yarn.rm.system-metrics-publisher.emit-container-events</name> + <value>false</value> + </property> + <property> <description>Number of worker threads that send the yarn system metrics @@ -1954,6 +1962,13 @@ <value>${hadoop.tmp.dir}/yarn/timeline</value> </property> + <property> + <description>The setting that controls how often the timeline collector + flushes the timeline writer.</description> + <name>yarn.timeline-service.writer.flush-interval-seconds</name> + <value>60</value> + </property> + <!-- Shared Cache Configuration --> <property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/54eded54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 23ad4f4..e9f2085 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -19,6 +19,13 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -32,9 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import com.google.common.annotations.VisibleForTesting; /** * Class that manages adding and removing collectors and their lifecycle. It @@ -47,7 +52,10 @@ public abstract class TimelineCollectorManager extends AbstractService { private static final Log LOG = LogFactory.getLog(TimelineCollectorManager.class); - protected TimelineWriter writer; + private TimelineWriter writer; + private ScheduledExecutorService writerFlusher; + private int flushInterval; + private boolean writerFlusherRunning; @Override public void serviceInit(Configuration conf) throws Exception { @@ -56,6 +64,12 @@ public abstract class TimelineCollectorManager extends AbstractService { FileSystemTimelineWriterImpl.class, TimelineWriter.class), conf); writer.init(conf); + // create a single dedicated thread for flushing the writer on a periodic + // basis + writerFlusher = Executors.newSingleThreadScheduledExecutor(); + flushInterval = conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS); super.serviceInit(conf); } @@ -65,6 +79,10 @@ public abstract class TimelineCollectorManager extends AbstractService { if (writer != null) { writer.start(); } + // schedule the flush task + writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer), + flushInterval, flushInterval, TimeUnit.SECONDS); + writerFlusherRunning = true; } // access to this map is synchronized with the map itself @@ -161,9 +179,48 @@ public abstract class TimelineCollectorManager extends AbstractService { c.serviceStop(); } } + // stop the flusher first + if (writerFlusher != null) { + writerFlusher.shutdown(); + writerFlusherRunning = false; + if (!writerFlusher.awaitTermination(30, TimeUnit.SECONDS)) { + // in reality it should be ample time for the flusher task to finish + // even if it times out, writers may be able to handle closing in this + // situation fine + // proceed to close the writer + LOG.warn("failed to stop the flusher task in time. " + + "will still proceed to close the writer."); + } + } if (writer != null) { writer.close(); } super.serviceStop(); } + + @VisibleForTesting + boolean writerFlusherRunning() { + return writerFlusherRunning; + } + + /** + * Task that invokes the flush operation on the timeline writer. + */ + private static class WriterFlushTask implements Runnable { + private final TimelineWriter writer; + + public WriterFlushTask(TimelineWriter writer) { + this.writer = writer; + } + + public void run() { + try { + writer.flush(); + } catch (Throwable th) { + // we need to handle all exceptions or subsequent execution may be + // suppressed + LOG.error("exception during timeline writer flush!", th); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/54eded54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 34a6b7c..b22b39f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -127,6 +127,11 @@ public class FileSystemTimelineWriterImpl extends AbstractService mkdirs(outputRoot, ENTITIES_DIR); } + @Override + public void flush() throws IOException { + // no op + } + private static String mkdirs(String... dirStrs) throws IOException { StringBuilder path = new StringBuilder(); for (String dirStr : dirStrs) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/54eded54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index e48ca60..876ad6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -214,6 +214,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements return null; } + @Override + public void flush() throws IOException { + // flush all buffered mutators + entityTable.flush(); + } + /** * close the hbase connections The close APIs perform flushing and release any * resources held http://git-wip-us.apache.org/repos/asf/hadoop/blob/54eded54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java index 5b4442c..381ff17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java @@ -187,6 +187,11 @@ public class PhoenixTimelineWriterImpl extends AbstractService } + @Override + public void flush() throws IOException { + // currently no-op + } + // Utility functions @Private @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hadoop/blob/54eded54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 494e8ad..50136de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -70,4 +70,13 @@ public interface TimelineWriter extends Service { */ TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException; + + /** + * Flushes the data to the backend storage. Whatever may be buffered will be + * written to the storage when the method returns. This may be a potentially + * time-consuming operation, and should be used judiciously. + * + * @throws IOException + */ + void flush() throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/54eded54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java index 87343fd..0d69fbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -67,6 +67,11 @@ public class TestNMTimelineCollectorManager { } @Test + public void testStartingWriterFlusher() throws Exception { + assertTrue(collectorManager.writerFlusherRunning()); + } + + @Test public void testStartWebApp() throws Exception { assertNotNull(collectorManager.getRestServerBindAddress()); String address = collectorManager.getRestServerBindAddress();