Repository: tez Updated Branches: refs/heads/branch-0.8 daf736929 -> 736996b92
TEZ-3291. Optimize splits grouping when locality information is not available (rbalamohan) (cherry picked from commit 1264d5d019c86ae5e26240693bfb8c0a81d44dcd) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/736996b9 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/736996b9 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/736996b9 Branch: refs/heads/branch-0.8 Commit: 736996b929afc040b43793664f4bfd46732e6070 Parents: daf7369 Author: Rajesh Balamohan <rbalamo...@apache.org> Authored: Tue Jun 21 18:01:46 2016 -0700 Committer: Rajesh Balamohan <rbalamo...@apache.org> Committed: Tue Jun 21 18:02:52 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/grouper/TezSplitGrouper.java | 69 ++++++++++++-------- .../hadoop/mapred/split/TestGroupedSplits.java | 51 +++++++++++++++ 3 files changed, 92 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/736996b9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d30d962..ccf2f77 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3291. Optimize splits grouping when locality information is not available. TEZ-3305. TestAnalyzer fails on Hadoop 2.7. TEZ-3304. TestHistoryParser fails with Hadoop 2.7. TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements http://git-wip-us.apache.org/repos/asf/tez/blob/736996b9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java index 9435e68..26e5a9e 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java @@ -182,6 +182,36 @@ public abstract class TezSplitGrouper { locationProvider = DEFAULT_SPLIT_LOCATION_PROVIDER; } + List<GroupedSplitContainer> groupedSplits = null; + String emptyLocation = "EmptyLocation"; + String localhost = "localhost"; + String[] emptyLocations = {emptyLocation}; + groupedSplits = new ArrayList<GroupedSplitContainer>(desiredNumSplits); + + boolean allSplitsHaveLocalhost = true; + + long totalLength = 0; + Map<String, LocationHolder> distinctLocations = createLocationsMap(conf); + // go through splits and add them to locations + for (SplitContainer split : originalSplits) { + totalLength += estimator.getEstimatedSize(split); + String[] locations = locationProvider.getPreferredLocations(split); + if (locations == null || locations.length == 0) { + locations = emptyLocations; + allSplitsHaveLocalhost = false; + } + for (String location : locations ) { + if (location == null) { + location = emptyLocation; + allSplitsHaveLocalhost = false; + } + if (!location.equalsIgnoreCase(localhost)) { + allSplitsHaveLocalhost = false; + } + distinctLocations.put(location, null); + } + } + if (! (configNumSplits > 0 || originalSplits.size() == 0)) { // numSplits has not been overridden by config @@ -189,10 +219,6 @@ public abstract class TezSplitGrouper { // there are splits generated // desired splits is less than number of splits generated // Do sanity checks - long totalLength = 0; - for (SplitContainer split : originalSplits) { - totalLength += estimator.getEstimatedSize(split); - } int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size(); long lengthPerGroup = totalLength/splitCount; @@ -223,19 +249,25 @@ public abstract class TezSplitGrouper { } else if (lengthPerGroup < minLengthPerGroup) { // splits too small to work. Need to override with size. int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1; + /** + * This is a workaround for systems like S3 that pass the same + * fake hostname for all splits. + */ + if (!allSplitsHaveLocalhost) { + desiredNumSplits = newDesiredNumSplits; + } + LOG.info("Desired splits: " + desiredNumSplits + " too large. " + " Desired splitLength: " + lengthPerGroup + " Min splitLength: " + minLengthPerGroup + " New desired splits: " + newDesiredNumSplits + + " Final desired splits: " + desiredNumSplits + + " All splits have localhost: " + allSplitsHaveLocalhost + " Total length: " + totalLength + " Original splits: " + originalSplits.size()); - - desiredNumSplits = newDesiredNumSplits; } } - List<GroupedSplitContainer> groupedSplits = null; - if (desiredNumSplits == 0 || originalSplits.size() == 0 || desiredNumSplits >= originalSplits.size()) { @@ -253,27 +285,6 @@ public abstract class TezSplitGrouper { return groupedSplits; } - String emptyLocation = "EmptyLocation"; - String[] emptyLocations = {emptyLocation}; - groupedSplits = new ArrayList<GroupedSplitContainer>(desiredNumSplits); - - long totalLength = 0; - Map<String, LocationHolder> distinctLocations = createLocationsMap(conf); - // go through splits and add them to locations - for (SplitContainer split : originalSplits) { - totalLength += estimator.getEstimatedSize(split); - String[] locations = locationProvider.getPreferredLocations(split); - if (locations == null || locations.length == 0) { - locations = emptyLocations; - } - for (String location : locations ) { - if (location == null) { - location = emptyLocation; - } - distinctLocations.put(location, null); - } - } - long lengthPerGroup = totalLength/desiredNumSplits; int numNodeLocations = distinctLocations.size(); int numSplitsPerLocation = originalSplits.size()/numNodeLocations; http://git-wip-us.apache.org/repos/asf/tez/blob/736996b9/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java index fba72a3..3dce417 100644 --- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java +++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java @@ -484,6 +484,57 @@ public class TestGroupedSplits { } } } + + + @Test (timeout = 30000) + public void testS3Scenario() throws IOException { + //There can be multiple nodes in cluster, but locations would be "localhost" in s3 + String[] locations = {"localhost"}; + int oriSplits = 52; + int desiredSplits = 19; + long splitLength = 231958; + + InputSplit[] origSplits = new InputSplit[oriSplits]; + + for (int i = 0; i < oriSplits; i++) { + String[] splitLoc = locations; + origSplits[i] = new TestInputSplit(splitLength, splitLoc, i); + } + + TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper(); + JobConf conf = new JobConf(defaultConf); + conf = (JobConf) TezSplitGrouper.newConfigBuilder(conf).build(); + + //Create splits now + InputSplit[] groupedSplits = + grouper.getGroupedSplits(conf, origSplits, desiredSplits, "SampleFormat"); + + //Verify + int splitsInGroup = oriSplits / desiredSplits; + int totalSplits = (int) Math.ceil(oriSplits * 1.0 / splitsInGroup); + assertEquals(totalSplits, groupedSplits.length); + + + // min split optimization should not be invoked if any location is not localhost + String[] nonLocalLocations = { "EmptyLocation", "localhost" }; + + origSplits = new InputSplit[oriSplits]; + + for (int i = 0; i < oriSplits; i++) { + String[] splitLoc = nonLocalLocations; + origSplits[i] = new TestInputSplit(splitLength, splitLoc, i); + } + + grouper = new TezMapredSplitsGrouper(); + conf = new JobConf(defaultConf); + conf = (JobConf) TezSplitGrouper.newConfigBuilder(conf).build(); + + //Create splits now + groupedSplits = grouper.getGroupedSplits(conf, origSplits, desiredSplits, "SampleFormat"); + + //splits should be 1 + assertEquals(1, groupedSplits.length); + } @SuppressWarnings({ "rawtypes", "unchecked" }) @Test(timeout=10000)