Dynamic information fed into Hadoop to control execution of a submiited job
---------------------------------------------------------------------------
Key: MAPREDUCE-1928
URL: https://issues.apache.org/jira/browse/MAPREDUCE-1928
Project: Hadoop Map/Reduce
Issue Type: New Feature
Reporter: Raman Grover
Currently the job submission protocol requires the job provider to put every
bit of information inside an instance of JobConf. The submitted information
includes the input data (hdfs path) , suspected resource requirement, number of
reducers etc. This information is read by JobTracker as part of job
initialization. Once initialized, job is moved into a running state. From this
point, there is no mechanism for any additional information to be fed into
Hadoop infrastructure for controlling the job execution.
The execution pattern for the job looks very much
static from this point. Using the size of input data and a few settings inside
JobConf, number of mappers is computed. Hadoop attempts at reading the whole of
data in parallel by launching parallel map tasks. Once map phase is over, a
known number of reduce tasks (supplied as part of JobConf) are started.
Parameters that control the job execution were set in JobConf prior to reading
the input data. As the map phase progresses, useful information based upon the
content of the input data surfaces and can be used in controlling the further
execution of the job. Let us walk through some of the examples where additional
information can be fed to Hadoop subsequent to job submission for optimal
execution of the job.
1) "Process a part of the input , based upon the results decide if reading more
input is required "
In a huge data set, user is interested in finding 'k' records that satisfy
a predicate, essentially sampling the data. In current implementation, as the
data is huge, a large no of mappers would be launched consuming a significant
fraction of the available map slots in the cluster. Each map task would attempt
at emitting a max of 'k' records. With N mappers, we get N*k records out of
which one can pick any k to form the final result.
This is not optimal as:
(i) A larger number of map slots get occupied initially, affecting other
jobs in the queue.
(ii) If the selectivity of input data is very low, we essentially did not
need scanning the whole of data to form our result.
we could have finished by reading a fraction of input data, monitoring
the cardinality of the map output and determining if
more input needs to be processed.
Optimal way: If reading the whole of input requires N mappers, launch only
'M' initially. Allow them to complete. Based upon the statistics collected,
decide additional number of mappers to be launched next and so on until the
whole of input has been processed or enough records have been collected to for
the results, whichever is earlier.
II) "Here is some data, the remaining is yet to arrive, but you may start with
it, and receive more input later"
Consider a chain of 2 M-R jobs chained together such that the latter reads
the output of the former. The second MR job cannot be started until the first
has finished completely. This is essentially because Hadoop needs to be told
the complete information about the input before beginning the job.
The first M-R has produced enough data ( not finished yet) that can be
processed by another MR job and hence the other MR need not wait to grab the
whole of input before beginning. Input splits could be supplied later , but
ofcourse before the copy/shuffle phase.
III) " Input data has undergone one round of processing by map phase, have
some stats, can now say of the resources
required further"
Mappers can produce useful stats about of their output, like the
cardinality or produce a histogram describing distribution of output . These
stats are available to the job provider (Hive/Pig/End User) who can
now determine with better accuracy of the resources (memory requirements
) required in reduction phase, and even the number of reducers or may even
alter the reduction logic by altering the reducer class parameter.
In a nut shell, certain parameters about a job are governed by the input data
and the intermediate results produced and hence need to be overridden as job
progresses. Hadoop does not allow such information to be fed dynamically. Hence
job execution may not always be optimal.
I would like to get feedback from the Hadoop community about the above proposal
and if any similar effort is already underway.
If we agree, as a next step I would like to discuss the implementation details
that I have worked out end-to-end.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.