[ 
https://issues.apache.org/jira/browse/MAPREDUCE-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13835200#comment-13835200
 ] 

Rajesh Balamohan commented on MAPREDUCE-5611:
---------------------------------------------

Just wanted to add the response times as well

Without Patch : 289 seconds
With Patch: 219 seconds

This testing was carried out with with Hive 0.10

> CombineFileInputFormat creates more rack-local tasks due to less split 
> location info.
> -------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-5611
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5611
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>    Affects Versions: trunk
>            Reporter: Chandra Prakash Bhagtani
>            Assignee: Chandra Prakash Bhagtani
>             Fix For: trunk
>
>         Attachments: CombineFileInputFormat-trunk.patch
>
>
> I have come across an issue with CombineFileInputFormat. Actually I ran a 
> hive query on approx 1.2 GB data with CombineHiveInputFormat which internally 
> uses CombineFileInputFormat. My cluster size is 9 datanodes and 
> max.split.size is 256 MB
> When I ran this query with replication factor 9, hive consistently creates 
> all 6 rack-local tasks and with replication factor 3 it creates 5 rack-local 
> and 1 data local tasks. 
>  When replication factor is 9 (equal to cluster size), all the tasks should 
> be data-local as each datanode contains all the replicas of the input data, 
> but that is not happening i.e all the tasks are rack-local. 
> When I dug into CombineFileInputFormat.java code in getMoreSplits method, I 
> found the issue with the following snippet (specially in case of higher 
> replication factor)
> {code:title=CombineFileInputFormat.java|borderStyle=solid}
> for (Iterator<Map.Entry<String,
>          List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
>          iter.hasNext();) {
>        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
>       nodes.add(one.getKey());
>       List<OneBlockInfo> blocksInNode = one.getValue();
>       // for each block, copy it into validBlocks. Delete it from
>       // blockToNodes so that the same block does not appear in
>       // two different splits.
>       for (OneBlockInfo oneblock : blocksInNode) {
>         if (blockToNodes.containsKey(oneblock)) {
>           validBlocks.add(oneblock);
>           blockToNodes.remove(oneblock);
>           curSplitSize += oneblock.length;
>           // if the accumulated split size exceeds the maximum, then
>           // create this split.
>           if (maxSize != 0 && curSplitSize >= maxSize) {
>             // create an input split and add it to the splits array
>             addCreatedSplit(splits, nodes, validBlocks);
>             curSplitSize = 0;
>             validBlocks.clear();
>           }
>         }
>       }
> {code}
> First node in the map nodeToBlocks has all the replicas of input file, so the 
> above code creates 6 splits all with only one location. Now if JT doesn't 
> schedule these tasks on that node, all the tasks will be rack-local, even 
> though all the other datanodes have all the other replicas.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to