[ 
https://issues.apache.org/jira/browse/MAPREDUCE-728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12728382#action_12728382
 ] 

Arun C Murthy commented on MAPREDUCE-728:
-----------------------------------------

h2. Mumak 1.0

The goal is to build a discrete event simulator to simulate conditions under 
which a Hadoop Map-Reduce Scheduler performs on a large-scale Map-Reduce 
cluster running a specific workload.

Mumak takes as input a reasonably large workload (e.g. a month's worth of jobs 
from production cluster(s)) and simulates them in a matter of hours if not 
minutes on very few machines.

h4. What is it not?

It is a non-goal to simulate the actual map/reduce tasks themselves.

The scope of Version 1.0 does not include specifics of trying to simulate the 
actual workload itself. It will merely take a digest of the Hadoop Map-Reduce 
JobHistory of all jobs in the workload, and faithfully assume the actual 
run-time of individual tasks from the digest without simulating the tasks 
themselves. Clearly this will not try and simulate resources and their 
utilization on the actual tasktrackers, interaction between running tasks on 
the tasktrackers etc. The simulation of individual tasks is left for future 
versions.

Some other simplifications are also made (mainly due to the lacking of such 
information from the job trace):

    * No job dependency. Jobs are faithfully submitted to the cluster as 
defined in the job trace.
    * No modeling of failure correlations (eg a few task attempts fail due to a 
node failure, but in the simulation run, the same set of task attempts may run 
on different nodes). 

h4. What goes in? What comes out?

The 'workload' alluded to in the previous sections needs elaboration. The 
proposal is to use the job-history for all jobs which are part of the workload. 
The Hadoop Map-Reduce per-job job-history is a very detailed log of each 
component task with run-times, counters etc. We can use this to generate a 
per-job digest with all relevant information. Thus, it is quite sufficient and 
feasible to collect workload from different clusters (research, production 
etc.) to be then used during simulation.

More specifically, the following is a list of details it simulates:

    * It would simulate a cluster of the same size and network topology as 
where the source trace comes from. The reason for this restriction is because 
data locality is an important factor to the scheduler decisions and scaling the 
job traces obtained from cluster A and try to simulate it on cluster B with 
different diameters require a much thorough understanding.
    * It would simulate failures faithfully as recorded in the job trace. 
Namely, if a particular task attempt fails in the trace, it would fail in the 
simulation.
    * It would replay the same number of map tasks and reduce tasks as 
specified in the job digest.
    * It would use the inputsplit locations as are recorded in the job trace. 

The simulator will generate the same job-history for each of the simulated 
jobs. Thus we can use the same tools for slicing and dicing the output of the 
simulator.

h4. Design & Architecture

Design Goals

An overarching design goal for Mumak is that we should be able to use the exact 
same Map-Reduce Schedulers (listed above) as-is without any changes. This 
implies that we use the same interfaces used by Hadoop Map-Reduce so that it is 
trivial to plug-in the Scheduler of interest.

Along the same lines it is a legitimate goal to use all relevant Hadoop 
Map-Reduce interfaces between various components so that it is trivial to 
replace each by the appropriate Hadoop Map-Reduce component (e.g. run the 
simulator in a emulation mode with real Map-Reduce clusters etc. in future).

Architecture

Mumak consists of the following components:

    * Discrete Event Simulator Engine with an event-queue
    * Simulated JobTracker
    * Simulated Cluster (set of tasktrackers)
    * Client for handling job-submission 

Engine

The Simulator Engine is the heart of Mumak. It manages all the discrete events 
in virtual time and fires the appropriate handlers (JobClient, TaskTracker) 
when the events occur. Typically each event responded to by a component results 
in a new set of events to be fired in the future (virtual time).

Some of the various event-types are:

    * HeartbeatEvent - An event which instructs a specific Tasktracker to send 
a heartbeat to the JobTracker.
    * TaskCompletionEvent - An event which denotes the completion 
(success/failure) of a specific map or reduce task which is sent to the 
TaskTracker.
    * JobSubmissionEvent - An event which instructs the JobClient to submit a 
specific job to the JobTracker 

Simulated JobTracker

The JobTracker is driver for the Map-Reduce Scheduler. On receipt of heartbeats 
from various TaskTrackers it 'tracks' progress of the current jobs and forwards 
the appropriate information to the Scheduler to allow it to make the 
task-scheduling decisions. The simulated JobTracker uses the virtual time to 
allow the scheduler to make scheduling decisions.

The JobTracker also uses the per-job digest to fill-in information about 
expected runtime for each of the tasks scheduled by the Scheduler to get 
Mumakil to simulate run-times for each task.

The JobTracker is purely reactive in the sense that it only reacts to hearbeats 
sent by TaskTrackers. Further more it does not directly handle any events from 
the Engine, it only responds to the InterTrackerProtocol.heartbeat calls as in 
the real-world.

Simulated Cluster

The simulated cluster consists of an appropriate number of simulated 
TaskTrackers which respond to events generated by Engine. Each simulated 
TaskTracker maintains state about currently running tasks (all tasks are 
'running' till an appropriate TaskCompletionEvent fires) and sends periodic 
status updates to the JobTracker on receipt of HeartbeatEvent.

HeartbeatEvent

When a HeartbeatEvent fires, the appropriate TaskTracker build status-reports 
for each of the running tasks and sends a hearbeat to the JobTracker 
(InterTrackerProtocol.heartbeat). The JobTracker updates its data-structures 
(JobInProgress, TaskInProgress etc.) to refect the latest state and forwards 
information to the Scheduler. If any new tasks are be to scheduled on this 
TaskTracker the JobTracker also fills in expected run-times for each via 
information gleaned from the job-digest. The TaskTracker then processes the 
instructions to launch the new tasks and responds to the Engine by inserting a 
set of new TaskCompletionEvents for the new tasks into the EventQueue.

TaskCompletionEvent

When a TaskCompletionEvent fires, the appropriate TaskTracker marks the 
relevant task as complete and forwards that information to the JobTracker on 
the next HeartbeatEvent.

Simulated JobClient

The JobClient responds to JobSubmissionEvents sent by the Engine and submits 
the appropriate jobs to the JobTracker via the standard JobSubmissionProtocol.

h4. Relevant Details

Job Summary for Simulation

The following can be derived from job history file by rumen:

    * Detailed job trace with properties and counters of each task attempt (of 
each task of each job in a workload).
    * Digest of jobs in a workload. From the jobs in the workload, we can 
derive statistical information of tasks to build a model which can help us 
fabricate tasks which not even scheduled to run (e.g. tasks of a failed job 
which were never run since the job was declared as FAILED soon after 
submission). Along the same lines, the digest will also have statistical 
details for helping modelling run-times for data-local maps, rack-local maps 
and off-rack maps based on data in the job-history logs. This is necessary for 
simulating tasks which might be scheduled on different nodes in the simulation 
run by the scheduler. 

How to deal with failure in workload?

We will try to faithfully model task failures by replaying failed task-attempts 
by using information in the detailed job-traces.

We also plan to build a simple statistical model of task failures which can 
then be used to simulate tasks which were never scheduled since the job failed 
early etc.

Simulating Reduce Tasks

In Mumak 1.0 we do not plan to simulate the running of the actual map/reduce 
tasks. Given that it is not possible to simulate the implicit dependency 
between completion of maps, the shuffle phase and the start of the reduce phase 
of the reduce tasks. Hence, we have decided to use a special AllMapsFinished 
event generated by the SimulatedJobTracker to trigger the start of the 
reduce-phase. For the same reasons, we have to model the total runtime of the 
reduce task as the summation of the time taken for completion of all maps and 
the time taken for individual task to complete the reduce-phase by itself. 
Thus, we are not going to try modelling the shuffle phase accurately.

Furthermore, we will ignore map-task failures due to failed shuffles since we 
are not simulating the shuffle-phase.

----

Thoughts?


> Mumak: Map-Reduce Simulator
> ---------------------------
>
>                 Key: MAPREDUCE-728
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-728
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>            Reporter: Arun C Murthy
>            Assignee: Arun C Murthy
>             Fix For: 0.21.0
>
>
> h3. Vision:
> We want to build a Simulator to simulate large-scale Hadoop clusters, 
> applications and workloads. This would be invaluable in furthering Hadoop by 
> providing a tool for researchers and developers to prototype features (e.g. 
> pluggable block-placement for HDFS, Map-Reduce schedulers etc.) and predict 
> their behaviour and performance with reasonable amount of confidence, 
> there-by aiding rapid innovation.
> ----
> h3. First Cut: Simulator for the Map-Reduce Scheduler
> The Map-Reduce Scheduler is a fertile area of interest with at least four 
> schedulers, each with their own set of features, currently in existence: 
> Default Scheduler, Capacity Scheduler, Fairshare Scheduler & Priority 
> Scheduler.
> Each scheduler's scheduling decisions are driven by many factors, such as 
> fairness, capacity guarantee, resource availability, data-locality etc.
> Given that, it is non-trivial to accurately choose a single scheduler or even 
> a set of desired features to predict the right scheduler (or features) for a 
> given workload. Hence a simulator which can predict how well a particular 
> scheduler works for some specific workload by quickly iterating over 
> schedulers and/or scheduler features would be quite useful.
> So, the first cut is to implement a simulator for the Map-Reduce scheduler 
> which take as input a job trace derived from production workload and a 
> cluster definition, and simulates the execution of the jobs in as defined in 
> the trace in this virtual cluster. As output, the detailed job execution 
> trace (recorded in relation to virtual simulated time) could then be analyzed 
> to understand various traits of individual schedulers (individual jobs turn 
> around time, throughput, faireness, capacity guarantee, etc). To support 
> this, we would need a simulator which could accurately model the conditions 
> of the actual system which would affect a schedulers decisions. These include 
> very large-scale clusters (thousands of nodes), the detailed characteristics 
> of the workload thrown at the clusters, job or task failures, data locality, 
> and cluster hardware (cpu, memory, disk i/o, network i/o, network topology) 
> etc.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to