Author: xuefu Date: Tue Jul 29 21:31:40 2014 New Revision: 1614496 URL: http://svn.apache.org/r1614496 Log: HIVE-7338: Create SparkPlanGenerator(missing new files in previous commit)
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java?rev=1614496&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java Tue Jul 29 21:31:40 2014 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.util.List; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.api.java.JavaPairRDD; + +public class ChainedTran implements SparkTran { + private List<SparkTran> trans; + + public ChainedTran(List<SparkTran> trans) { + this.trans = trans; + } + + @Override + public JavaPairRDD<BytesWritable, BytesWritable> transform( + JavaPairRDD<BytesWritable, BytesWritable> input) { + JavaPairRDD<BytesWritable, BytesWritable> result= input; + for (SparkTran tran : trans) { + result = tran.transform(result); + } + return result; + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java?rev=1614496&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java Tue Jul 29 21:31:40 2014 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.api.java.JavaPairRDD; + +public class MapTran implements SparkTran { + private HiveMapFunction mapFunc; + + @Override + public JavaPairRDD<BytesWritable, BytesWritable> transform( + JavaPairRDD<BytesWritable, BytesWritable> input) { + return input.mapPartitionsToPair(mapFunc); + } + + public void setMapFunction(HiveMapFunction mapFunc) { + this.mapFunc = mapFunc; + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java?rev=1614496&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java Tue Jul 29 21:31:40 2014 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.api.java.JavaPairRDD; + +public class ReduceTran implements SparkTran { + private HiveReduceFunction reduceFunc; + + @Override + public JavaPairRDD<BytesWritable, BytesWritable> transform( + JavaPairRDD<BytesWritable, BytesWritable> input) { + return input.mapPartitionsToPair(reduceFunc); + } + + public void setReduceFunction(HiveReduceFunction redFunc) { + this.reduceFunc = redFunc; + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1614496&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java Tue Jul 29 21:31:40 2014 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.HashPartitioner; +import org.apache.spark.api.java.JavaPairRDD; + +public class ShuffleTran implements SparkTran { + + @Override + public JavaPairRDD<BytesWritable, BytesWritable> transform( + JavaPairRDD<BytesWritable, BytesWritable> input) { + return input.partitionBy(new HashPartitioner(1)); + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1614496&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Tue Jul 29 21:31:40 2014 @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.api.java.JavaPairRDD; + +public class SparkPlan { + private JavaPairRDD<BytesWritable, BytesWritable> input; + private SparkTran tran; + + public void execute() { + JavaPairRDD<BytesWritable, BytesWritable> rdd = tran.transform(input); + rdd.foreach(HiveVoidFunction.getInstance()); + } + + public SparkTran getTran() { + return tran; + } + + public void setTran(SparkTran tran) { + this.tran = tran; + } + + public JavaPairRDD<BytesWritable, BytesWritable> getInput() { + return input; + } + + public void setInput(JavaPairRDD<BytesWritable, BytesWritable> input) { + this.input = input; + } +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1614496&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Jul 29 21:31:40 2014 @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; + +public class SparkPlanGenerator { + private JavaSparkContext sc; + private JobConf jobConf; + private Context context; + private Path scratchDir; + + public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, Path scratchDir) { + this.sc = sc; + this.context = context; + this.jobConf = jobConf; + this.scratchDir = scratchDir; + } + + public SparkPlan generate(SparkWork sparkWork) throws Exception { + SparkPlan plan = new SparkPlan(); + List<SparkTran> trans = new ArrayList<SparkTran>(); + Set<BaseWork> roots = sparkWork.getRoots(); + assert(roots != null && roots.size() == 1); + BaseWork w = roots.iterator().next(); + MapWork mapWork = (MapWork) w; + trans.add(generate(w)); + while (sparkWork.getChildren(w).size() > 0) { + BaseWork child = sparkWork.getChildren(w).get(0); + SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child); + trans.add(generate(edge)); + trans.add(generate(child)); + w = child; + } + ChainedTran chainedTran = new ChainedTran(trans); + plan.setTran(chainedTran); + JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(mapWork); + plan.setInput(input); + return plan; + } + + private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork mapWork) throws Exception { + List<Path> inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false); + Utilities.setInputPaths(jobConf, inputPaths); + Utilities.setMapWork(jobConf, mapWork, scratchDir, true); + Class ifClass = HiveInputFormat.class; + + // The mapper class is expected by the HiveInputFormat. + jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); + return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); + } + + private SparkTran generate(BaseWork bw) throws IOException { + if (bw instanceof MapWork) { + return generate((MapWork)bw); + } else if (bw instanceof ReduceWork) { + return generate((ReduceWork)bw); + } else { + throw new IllegalArgumentException("Only MapWork and ReduceWork are expected"); + } + } + + private MapTran generate(MapWork mw) throws IOException { + MapTran result = new MapTran(); + Utilities.setMapWork(jobConf, mw, scratchDir, true); + Utilities.createTmpDirs(jobConf, mw); + jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); + byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); + HiveMapFunction mapFunc = new HiveMapFunction(confBytes); + result.setMapFunction(mapFunc); + return result; + } + + private ShuffleTran generate(SparkEdgeProperty edge) { + // TODO: based on edge type, create groupBy or sortBy transformations. + return new ShuffleTran(); + } + + private ReduceTran generate(ReduceWork rw) throws IOException { + ReduceTran result = new ReduceTran(); + Utilities.setReduceWork(jobConf, rw, scratchDir, true); + Utilities.createTmpDirs(jobConf, rw); + byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); + HiveReduceFunction mapFunc = new HiveReduceFunction(confBytes); + result.setReduceFunction(mapFunc); + return result; + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java?rev=1614496&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java Tue Jul 29 21:31:40 2014 @@ -0,0 +1,9 @@ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.api.java.JavaPairRDD; + +public interface SparkTran { + JavaPairRDD<BytesWritable, BytesWritable> transform( + JavaPairRDD<BytesWritable, BytesWritable> input); +}