Author: xuefu
Date: Tue Jul 29 21:31:40 2014
New Revision: 1614496

URL: http://svn.apache.org/r1614496
Log:
HIVE-7338: Create SparkPlanGenerator(missing new files in previous commit)

Added:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java?rev=1614496&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java
 Tue Jul 29 21:31:40 2014
@@ -0,0 +1,43 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class ChainedTran implements SparkTran {
+  private List<SparkTran> trans;
+
+  public ChainedTran(List<SparkTran> trans) {
+    this.trans = trans;
+  }
+
+  @Override
+  public JavaPairRDD<BytesWritable, BytesWritable> transform(
+      JavaPairRDD<BytesWritable, BytesWritable> input) {
+    JavaPairRDD<BytesWritable, BytesWritable> result= input;
+    for (SparkTran tran : trans) {
+      result = tran.transform(result);
+    }
+    return result;
+  }
+
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java?rev=1614496&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
 Tue Jul 29 21:31:40 2014
@@ -0,0 +1,37 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class MapTran implements SparkTran {
+       private HiveMapFunction mapFunc;
+
+       @Override
+       public JavaPairRDD<BytesWritable, BytesWritable> transform(
+                       JavaPairRDD<BytesWritable, BytesWritable> input) {
+               return input.mapPartitionsToPair(mapFunc);
+       }
+
+       public void setMapFunction(HiveMapFunction mapFunc) {
+               this.mapFunc = mapFunc;
+       }
+
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java?rev=1614496&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
 Tue Jul 29 21:31:40 2014
@@ -0,0 +1,37 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class ReduceTran implements SparkTran {
+  private HiveReduceFunction reduceFunc;
+  
+       @Override
+       public JavaPairRDD<BytesWritable, BytesWritable> transform(
+                       JavaPairRDD<BytesWritable, BytesWritable> input) {
+               return input.mapPartitionsToPair(reduceFunc);
+       }
+
+       public void setReduceFunction(HiveReduceFunction redFunc) {
+               this.reduceFunc = redFunc;
+       }
+
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1614496&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
 Tue Jul 29 21:31:40 2014
@@ -0,0 +1,33 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class ShuffleTran implements SparkTran {
+
+       @Override
+       public JavaPairRDD<BytesWritable, BytesWritable> transform(
+                       JavaPairRDD<BytesWritable, BytesWritable> input) {
+               return input.partitionBy(new HashPartitioner(1));
+       }
+
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1614496&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
 Tue Jul 29 21:31:40 2014
@@ -0,0 +1,48 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class SparkPlan {
+       private JavaPairRDD<BytesWritable, BytesWritable> input;
+       private SparkTran tran;
+       
+       public void execute() {
+               JavaPairRDD<BytesWritable, BytesWritable> rdd = 
tran.transform(input);
+               rdd.foreach(HiveVoidFunction.getInstance());
+       }
+
+       public SparkTran getTran() {
+               return tran;
+       }
+
+       public void setTran(SparkTran tran) {
+               this.tran = tran;
+       }
+
+       public JavaPairRDD<BytesWritable, BytesWritable> getInput() {
+               return input;
+       }
+
+       public void setInput(JavaPairRDD<BytesWritable, BytesWritable> input) {
+               this.input = input;
+       }
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1614496&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
 Tue Jul 29 21:31:40 2014
@@ -0,0 +1,125 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkPlanGenerator {
+  private JavaSparkContext sc;
+  private JobConf jobConf;
+  private Context context;
+  private Path scratchDir;
+
+  public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf 
jobConf, Path scratchDir) {
+    this.sc = sc;
+    this.context = context;
+    this.jobConf = jobConf;
+    this.scratchDir = scratchDir;
+  }
+
+  public SparkPlan generate(SparkWork sparkWork) throws Exception {
+    SparkPlan plan = new SparkPlan();
+    List<SparkTran> trans = new ArrayList<SparkTran>();
+    Set<BaseWork> roots = sparkWork.getRoots();
+    assert(roots != null && roots.size() == 1);
+    BaseWork w = roots.iterator().next();
+    MapWork mapWork = (MapWork) w;
+    trans.add(generate(w));
+    while (sparkWork.getChildren(w).size() > 0) {
+      BaseWork child = sparkWork.getChildren(w).get(0);
+      SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child);
+      trans.add(generate(edge));
+      trans.add(generate(child));
+      w = child;
+    }
+    ChainedTran chainedTran = new ChainedTran(trans);
+    plan.setTran(chainedTran);
+    JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(mapWork);
+    plan.setInput(input);
+    return plan;
+  }
+
+  private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork 
mapWork) throws Exception {
+    List<Path> inputPaths = Utilities.getInputPaths(jobConf, mapWork, 
scratchDir, context, false);
+    Utilities.setInputPaths(jobConf, inputPaths);
+    Utilities.setMapWork(jobConf, mapWork, scratchDir, true);
+    Class ifClass = HiveInputFormat.class;
+
+    // The mapper class is expected by the HiveInputFormat.
+    jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
+    return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, 
Writable.class);
+  }
+
+  private SparkTran generate(BaseWork bw) throws IOException {
+    if (bw instanceof MapWork) {
+      return generate((MapWork)bw);
+    } else if (bw instanceof ReduceWork) {
+      return generate((ReduceWork)bw);
+    } else {
+      throw new IllegalArgumentException("Only MapWork and ReduceWork are 
expected");
+    }
+  }
+
+  private MapTran generate(MapWork mw) throws IOException {
+    MapTran result = new MapTran();
+    Utilities.setMapWork(jobConf, mw, scratchDir, true);
+    Utilities.createTmpDirs(jobConf, mw);
+    jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
+    byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+    HiveMapFunction mapFunc = new HiveMapFunction(confBytes);
+    result.setMapFunction(mapFunc);
+    return result;
+  }
+
+  private ShuffleTran generate(SparkEdgeProperty edge) {
+    // TODO: based on edge type, create groupBy or sortBy transformations.
+    return new ShuffleTran();
+  }
+
+  private ReduceTran generate(ReduceWork rw) throws IOException {
+    ReduceTran result = new ReduceTran();
+    Utilities.setReduceWork(jobConf, rw, scratchDir, true);
+    Utilities.createTmpDirs(jobConf, rw);
+    byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+    HiveReduceFunction mapFunc = new HiveReduceFunction(confBytes);
+    result.setReduceFunction(mapFunc);
+    return result;
+  }
+
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java?rev=1614496&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
 Tue Jul 29 21:31:40 2014
@@ -0,0 +1,9 @@
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public interface SparkTran {
+  JavaPairRDD<BytesWritable, BytesWritable> transform(
+                 JavaPairRDD<BytesWritable, BytesWritable> input);
+}


Reply via email to