Author: szehon Date: Mon Nov 17 19:57:12 2014 New Revision: 1640218 URL: http://svn.apache.org/r1640218 Log: HIVE-8892 : Use MEMORY_AND_DISK for RDD caching [Spark Branch] (Jimmy Xiang via Szehon)
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1640218&r1=1640217&r2=1640218&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Mon Nov 17 19:57:12 2014 @@ -35,19 +35,19 @@ import scala.Tuple2; public class MapInput implements SparkTran<WritableComparable, Writable, WritableComparable, Writable> { private JavaPairRDD<WritableComparable, Writable> hadoopRDD; - private StorageLevel storageLevel; + private boolean toCache; public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD) { - this(hadoopRDD, null); + this(hadoopRDD, false); } - public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, StorageLevel level) { + public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache) { this.hadoopRDD = hadoopRDD; - setStorageLevel(level); + this.toCache = toCache; } - public void setStorageLevel(StorageLevel level) { - storageLevel = level; + public void setToCache(boolean toCache) { + this.toCache = toCache; } @Override @@ -55,8 +55,8 @@ public class MapInput implements SparkTr JavaPairRDD<WritableComparable, Writable> input) { Preconditions.checkArgument(input == null, "AssertionError: MapInput doesn't take any input"); - return storageLevel == null || storageLevel.equals(StorageLevel.NONE()) ? hadoopRDD : - hadoopRDD.mapToPair(new CopyFunction()).persist(storageLevel); + return toCache ? hadoopRDD.mapToPair( + new CopyFunction()).persist(StorageLevel.MEMORY_AND_DISK()) : hadoopRDD; } private static class CopyFunction implements PairFunction<Tuple2<WritableComparable, Writable>, Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1640218&r1=1640217&r2=1640218&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java Mon Nov 17 19:57:12 2014 @@ -26,21 +26,21 @@ import org.apache.spark.storage.StorageL public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, Iterable<BytesWritable>> { private final SparkShuffler shuffler; private final int numOfPartitions; - private final StorageLevel storageLevel; + private final boolean toCache; public ShuffleTran(SparkShuffler sf, int n) { - this(sf, n, null); + this(sf, n, false); } - public ShuffleTran(SparkShuffler sf, int n, StorageLevel level) { + public ShuffleTran(SparkShuffler sf, int n, boolean toCache) { shuffler = sf; numOfPartitions = n; - storageLevel = level; + this.toCache = toCache; } @Override public JavaPairRDD<HiveKey, Iterable<BytesWritable>> transform(JavaPairRDD<HiveKey, BytesWritable> input) { JavaPairRDD<HiveKey, Iterable<BytesWritable>> result = shuffler.shuffle(input, numOfPartitions); - return storageLevel == null || storageLevel.equals(StorageLevel.NONE()) ? result : result.persist(storageLevel); + return toCache ? result.persist(StorageLevel.MEMORY_AND_DISK()) : result; } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1640218&r1=1640217&r2=1640218&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Mon Nov 17 19:57:12 2014 @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import com.google.common.base.Preconditions; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,7 +53,6 @@ import org.apache.hadoop.io.WritableComp import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.storage.StorageLevel; public class SparkPlanGenerator { private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class); @@ -133,8 +131,7 @@ public class SparkPlanGenerator { sparkPlan.addTran(result); } else if (work instanceof ReduceWork) { List<BaseWork> parentWorks = sparkWork.getParents(work); - StorageLevel level = cloneToWork.containsKey(work) ? getStorageLevel(jobConf) : null; - result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), level); + result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work)); sparkPlan.addTran(result); for (BaseWork parentWork : parentWorks) { sparkPlan.connect(workToTranMap.get(parentWork), result); @@ -151,20 +148,6 @@ public class SparkPlanGenerator { return result; } - private StorageLevel getStorageLevel(JobConf jobConf) { - String storageLevel = jobConf.get("spark.storage.level"); - if (storageLevel == null || storageLevel.isEmpty()) { - return StorageLevel.MEMORY_AND_DISK(); - } - try { - return StorageLevel.fromString(storageLevel); - } catch (IllegalArgumentException iae) { - LOG.error("Invalid configuraiton for 'spark.storage.level': " - + storageLevel, iae); - throw iae; - } - } - private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException { // MergeFileWork is sub-class of MapWork, we don't need to distinguish here if (mWork.getInputformat() != null) { @@ -201,12 +184,11 @@ public class SparkPlanGenerator { JavaPairRDD<WritableComparable, Writable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); - StorageLevel level = cloneToWork.containsKey(mapWork) ? getStorageLevel(jobConf) : null; - MapInput result = new MapInput(hadoopRDD, level); + MapInput result = new MapInput(hadoopRDD, cloneToWork.containsKey(mapWork)); return result; } - private ShuffleTran generate(SparkEdgeProperty edge, StorageLevel level) { + private ShuffleTran generate(SparkEdgeProperty edge, boolean toCache) { Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); SparkShuffler shuffler; @@ -217,7 +199,7 @@ public class SparkPlanGenerator { } else { shuffler = new GroupByShuffler(); } - return new ShuffleTran(shuffler, edge.getNumPartitions(), level); + return new ShuffleTran(shuffler, edge.getNumPartitions(), toCache); } private MapTran generate(MapWork mw) throws Exception {