Author: xuefu Date: Fri Nov 21 14:06:08 2014 New Revision: 1640916 URL: http://svn.apache.org/r1640916 Log: HIVE-8868: SparkSession and SparkClient mapping[Spark Branch] (Rui via Xuefu)
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1640916&r1=1640915&r2=1640916&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Fri Nov 21 14:06:08 2014 @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.ql.exec.spark; import org.apache.commons.compress.utils.CharsetNames; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java?rev=1640916&r1=1640915&r2=1640916&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java Fri Nov 21 14:06:08 2014 @@ -20,13 +20,14 @@ package org.apache.hadoop.hive.ql.exec.s import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.SparkWork; public interface SparkSession { /** * Initializes a Spark session for DAG execution. */ - public void open(HiveConf conf); + public void open(HiveConf conf) throws HiveException; /** * Submit given <i>sparkWork</i> to SparkClient Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java?rev=1640916&r1=1640915&r2=1640916&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java Fri Nov 21 14:06:08 2014 @@ -26,15 +26,13 @@ import org.apache.hadoop.hive.ql.DriverC import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.spark.SparkException; import java.io.IOException; import java.util.UUID; -/** - * Simple implementation of <i>SparkSession</i> which currently just submits jobs to - * SparkClient which is shared by all SparkSession instances. - */ public class SparkSessionImpl implements SparkSession { private static final Log LOG = LogFactory.getLog(SparkSession.class); @@ -48,16 +46,19 @@ public class SparkSessionImpl implements } @Override - public void open(HiveConf conf) { + public void open(HiveConf conf) throws HiveException { this.conf = conf; isOpen = true; + try { + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf); + } catch (Exception e) { + throw new HiveException("Failed to create spark client.", e); + } } @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs."); - Configuration hiveConf = driverContext.getCtx().getConf(); - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(hiveConf); return hiveSparkClient.execute(driverContext, sparkWork); }