Design/implement a general log-aggregation framework for Hadoop
---------------------------------------------------------------
Key: HADOOP-2206
URL: https://issues.apache.org/jira/browse/HADOOP-2206
Project: Hadoop
Issue Type: New Feature
Reporter: Arun C Murthy
Assignee: Arun C Murthy
Fix For: 0.16.0
I'd like to propose a log-aggregation framework which facilitates collection,
aggregation and storage of the logs of the Hadoop Map-Reduce framework and
user-jobs in HDFS. Clearly the design/implementation of this framework is
heavily influenced and limited by Hadoop itself for e.g. lack of appends, not
too many small files (think: stdout/stderr/syslog of each map/reduce task) and
so on.
This framework will be especially useful once HoD (HADOOP-1301) is used to
provision dynamic, per-user, Map-Reduce clusters.
h4. Requirements:
* Store the various logs to a configurable location in the Hadoop Distributed
FileSystem
** User task logs (stdout, stderr, syslog)
** Map-Reduce daemons' logs (JobTracker and TaskTracker)
* Integrate well with Hadoop and ensure no adverse performance impact on the
Map-Reduce framework.
* It must not use a HDFS file (or more!) per a task, which would swamp the
NameNode capabilities.
* The aggregation system must be distributed and reliable.
* Facilities/tools to read the aggregated logs.
* The aggregated logs should be compressed.
h4. Architecture:
Here is a high-level overview of the log-aggregation framework:
h5. Logging
* Provision a cloud of log-aggregators in the cluster (outside of the Hadoop
cluster, running on the subset of nodes in the cluster). Lets call each one in
the cloud as a Log Aggregator i.e. LA.
* Each LA writes out 2 files per Map-Reduce cluster: an index file and a data
file. The LA maintains one directory per Map-Reduce cluster on HDFS.
* The index file format is simple:
** streamid (_streamid_ is either daemon identifier e.g.
tasktracker_foo.bar.com:57891 or $jobid-$taskid-(stdout|stderr|syslog) or
individual task-logs)
** timestamp
** logs-data start offset
** no. of bytes
* Each Hadoop daemon (NN/DN/JT/TT) is given the entire list of LAs in the
cluster.
* Each daemon picks one LA (at random) from the list, opens an exclusive stream
with the LA after identifying itself (i.e. ${daemonid}) and sends it's logs. In
case of error/failure to log it just connects to another LA as above and starts
logging to it.
* The logs are sent to the LA by a new log4j appender. The appender provides
some amount of buffering on the client-side.
* Implement a feature in the TaskTracker which lets it use the same appender to
send out the userlogs (stdout/stderr/syslog) to the LA after task completion.
This is important to ensure that logging to the LA at runtime doesn't hurt the
task's performance (see HADOOP-1553). The TaskTracker picks an LA per task in a
manner similar to the one it uses for it's own logs, identifies itself
(<${jobid}, ${taskid}, {stdout|stderr|syslog}>) and streams the entire task-log
at one go. In fact we can pick different LAs for each of the task's stdout,
stderr and syslog logs - each an exclusive stream to a single LA.
* The LA buffers some amount of data in memory (say 16K) and then flushes that
data to the HDFS file (per LA per cluster) after writing out an entry to the
index file.
* The LA periodically purges old logs (monthly, fortnightly or weekly as
today).
h5. Getting the logged information
The main requirement is to implement a simple set of tools to query the LA
(i.e. the index/data files on HDFS) to glean the logged information.
If we can think of each Map-Reduce cluster's logs as a set of archives (i.e.
one file per cluster per LA used) we need the ability to query the log-archive
to figure out the available streams and the ability to get one entire stream or
a subset of time based on timestamp-ranges. Essentially these are simple tools
which parse the index files of each LA (for a given Hadoop cluster) and return
the required information.
h6. Query for available streams
The query just returns all the available streams in an cluster-log archive
identified by the HDFS path.
It looks something like this for a cluster with 3 nodes which ran 2 jobs, first
of which had 2 maps, 1 reduce and the second had 1 map, 1 reduce:
{noformat}
$ la -query /log-aggregation/cluster-20071113
Available streams:
jobtracker_foo.bar.com:57893
tasktracker_baz.bar.com:57841
tasktracker_fii.bar.com:57891
job_20071113_0001-task_20071113_0001_m_000000_0-stdout
job_20071113_0001-task_20071113_0001_m_000000_0-stderr
job_20071113_0001-task_20071113_0001_m_000000_0-syslog
job_20071113_0001-task_20071113_0001_m_000001_0-stdout
job_20071113_0001-task_20071113_0001_m_000001_0-stderr
job_20071113_0001-task_20071113_0001_m_000001_0-syslog
job_20071113_0001-task_20071113_0001_r_000000_0-stdout
job_20071113_0001-task_20071113_0001_r_000000_0-stderr
job_20071113_0001-task_20071113_0001_r_000000_0-syslog
job_20071113_0001-task_20071113_0001_m_000000_0-stdout
job_20071113_0001-task_20071113_0002_m_000000_0-stderr
job_20071113_0001-task_20071113_0002_m_000000_0-syslog
job_20071113_0001-task_20071113_0002_m_000001_0-stdout
job_20071113_0001-task_20071113_0002_m_000001_0-stderr
job_20071113_0001-task_20071113_0002_m_000001_0-syslog
job_20071113_0001-task_20071113_0002_r_000000_0-stdout
job_20071113_0001-task_20071113_0002_r_000000_0-stderr
job_20071113_0001-task_20071113_0002_r_000000_0-syslog
{noformat}
h6. Get logged information per stream
The framework also offers the ability to query and fetch the actual log-data,
per-stream for a given timestamp-range. It looks something like:
{noformat}
$ la -fetch -daemon jt -range <t1:t2> /log-aggregation/cluster-20071113
$ la -fetch -daemon tt1 /log-aggregation/cluster-20071113
$ la -fetch -jobid <jobid> -taskid <taskid> -log <out|err|sys> -range
<t1:t2> /log-aggregation/cluster-20071113
{noformat}
Thoughts?
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.