Repository: spark
Updated Branches:
  refs/heads/master 594a1bf20 -> 0f61d6efb


[SPARK-15552][SQL] Remove unnecessary private[sql] methods in SparkSession

## What changes were proposed in this pull request?
SparkSession has a list of unnecessary private[sql] methods. These methods 
cause some trouble because private[sql] doesn't apply in Java. In the cases 
that they are easy to remove, we can simply remove them. This patch does that.

As part of this pull request, I also replaced a bunch of protected[sql] with 
private[sql], to tighten up visibility.

## How was this patch tested?
Updated test cases to reflect the changes.

Author: Reynold Xin <r...@databricks.com>

Closes #13319 from rxin/SPARK-15552.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f61d6ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f61d6ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f61d6ef

Branch: refs/heads/master
Commit: 0f61d6efb45b9ee94fa663f67c4489fbdae2eded
Parents: 594a1bf
Author: Reynold Xin <r...@databricks.com>
Authored: Thu May 26 13:03:07 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu May 26 13:03:07 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  |  4 +-
 .../scala/org/apache/spark/sql/Dataset.scala    | 32 ++++++------
 .../scala/org/apache/spark/sql/SQLContext.scala | 49 +++---------------
 .../org/apache/spark/sql/SparkSession.scala     | 54 ++++++++------------
 .../spark/sql/execution/CacheManager.scala      |  2 +-
 .../spark/sql/execution/QueryExecution.scala    |  2 +-
 .../spark/sql/execution/command/commands.scala  |  2 +-
 .../spark/sql/execution/command/ddl.scala       |  3 +-
 .../spark/sql/execution/command/views.scala     |  6 +--
 .../sql/execution/datasources/DataSource.scala  |  2 +-
 .../InsertIntoDataSourceCommand.scala           |  2 +-
 .../apache/spark/sql/internal/CatalogImpl.scala | 24 ++++-----
 .../spark/sql/internal/SessionState.scala       |  4 +-
 .../org/apache/spark/sql/CachedTableSuite.scala | 16 +++---
 .../apache/spark/sql/DataFramePivotSuite.scala  |  2 +-
 .../apache/spark/sql/DatasetCacheSuite.scala    | 12 +++--
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  8 +--
 .../columnar/InMemoryColumnarQuerySuite.scala   |  8 +--
 .../sql/execution/metric/SQLMetricsSuite.scala  | 18 ++++---
 .../sql/execution/ui/SQLListenerSuite.scala     | 16 +++---
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  4 +-
 .../sql/hive/thriftserver/SparkSQLDriver.scala  |  2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  7 ++-
 .../execution/CreateTableAsSelectCommand.scala  |  2 +-
 .../hive/execution/InsertIntoHiveTable.scala    |  2 +-
 .../spark/sql/hive/ErrorPositionSuite.scala     |  2 +-
 .../spark/sql/hive/ShowCreateTableSuite.scala   |  4 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala |  2 +-
 .../hive/execution/ConcurrentHiveSuite.scala    |  6 +--
 29 files changed, 129 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3aacce7..2e85e36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -402,7 +402,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       Project(inputDataCols ++ inputPartCols, df.logicalPlan)
     }.getOrElse(df.logicalPlan)
 
-    df.sparkSession.executePlan(
+    df.sparkSession.sessionState.executePlan(
       InsertIntoTable(
         UnresolvedRelation(tableIdent),
         partitions.getOrElse(Map.empty[String, Option[String]]),
@@ -524,7 +524,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
             mode,
             extraOptions.toMap,
             df.logicalPlan)
-        df.sparkSession.executePlan(cmd).toRdd
+        df.sparkSession.sessionState.executePlan(cmd).toRdd
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e5140fc..961ae32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -58,7 +58,7 @@ private[sql] object Dataset {
   }
 
   def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame 
= {
-    val qe = sparkSession.executePlan(logicalPlan)
+    val qe = sparkSession.sessionState.executePlan(logicalPlan)
     qe.assertAnalyzed()
     new Dataset[Row](sparkSession, logicalPlan, RowEncoder(qe.analyzed.schema))
   }
@@ -165,14 +165,14 @@ class Dataset[T] private[sql](
   // you wrap it with `withNewExecutionId` if this actions doesn't call other 
action.
 
   def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: 
Encoder[T]) = {
-    this(sparkSession, sparkSession.executePlan(logicalPlan), encoder)
+    this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), 
encoder)
   }
 
   def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: 
Encoder[T]) = {
     this(sqlContext.sparkSession, logicalPlan, encoder)
   }
 
-  @transient protected[sql] val logicalPlan: LogicalPlan = {
+  @transient private[sql] val logicalPlan: LogicalPlan = {
     def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
       case _: Command |
            _: InsertIntoTable |
@@ -215,7 +215,7 @@ class Dataset[T] private[sql](
   // sqlContext must be val because a stable identifier is expected when you 
import implicits
   @transient lazy val sqlContext: SQLContext = sparkSession.sqlContext
 
-  protected[sql] def resolve(colName: String): NamedExpression = {
+  private[sql] def resolve(colName: String): NamedExpression = {
     queryExecution.analyzed.resolveQuoted(colName, 
sparkSession.sessionState.analyzer.resolver)
       .getOrElse {
         throw new AnalysisException(
@@ -223,7 +223,7 @@ class Dataset[T] private[sql](
       }
   }
 
-  protected[sql] def numericColumns: Seq[Expression] = {
+  private[sql] def numericColumns: Seq[Expression] = {
     schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
       queryExecution.analyzed.resolveQuoted(n.name, 
sparkSession.sessionState.analyzer.resolver).get
     }
@@ -417,7 +417,7 @@ class Dataset[T] private[sql](
    */
   def explain(extended: Boolean): Unit = {
     val explain = ExplainCommand(queryExecution.logical, extended = extended)
-    sparkSession.executePlan(explain).executedPlan.executeCollect().foreach {
+    
sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach
 {
       // scalastyle:off println
       r => println(r.getString(0))
       // scalastyle:on println
@@ -641,7 +641,7 @@ class Dataset[T] private[sql](
   def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): 
DataFrame = {
     // Analyze the self join. The assumption is that the analyzer will 
disambiguate left vs right
     // by creating a new instance for one of the branch.
-    val joined = sparkSession.executePlan(
+    val joined = sparkSession.sessionState.executePlan(
       Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), 
None))
       .analyzed.asInstanceOf[Join]
 
@@ -757,7 +757,7 @@ class Dataset[T] private[sql](
     val left = this.logicalPlan
     val right = other.logicalPlan
 
-    val joined = sparkSession.executePlan(Join(left, right, joinType =
+    val joined = sparkSession.sessionState.executePlan(Join(left, right, 
joinType =
       JoinType(joinType), Some(condition.expr)))
     val leftOutput = joined.analyzed.output.take(left.output.length)
     val rightOutput = joined.analyzed.output.takeRight(right.output.length)
@@ -1263,7 +1263,7 @@ class Dataset[T] private[sql](
   def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
     val inputPlan = logicalPlan
     val withGroupingKey = AppendColumns(func, inputPlan)
-    val executed = sparkSession.executePlan(withGroupingKey)
+    val executed = sparkSession.sessionState.executePlan(withGroupingKey)
 
     new KeyValueGroupedDataset(
       encoderFor[K],
@@ -2238,7 +2238,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def persist(): this.type = {
-    sparkSession.cacheManager.cacheQuery(this)
+    sparkSession.sharedState.cacheManager.cacheQuery(this)
     this
   }
 
@@ -2260,7 +2260,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def persist(newLevel: StorageLevel): this.type = {
-    sparkSession.cacheManager.cacheQuery(this, None, newLevel)
+    sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
     this
   }
 
@@ -2273,7 +2273,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def unpersist(blocking: Boolean): this.type = {
-    sparkSession.cacheManager.tryUncacheQuery(this, blocking)
+    sparkSession.sharedState.cacheManager.tryUncacheQuery(this, blocking)
     this
   }
 
@@ -2294,7 +2294,7 @@ class Dataset[T] private[sql](
   lazy val rdd: RDD[T] = {
     val objectType = unresolvedTEncoder.deserializer.dataType
     val deserialized = CatalystSerde.deserialize[T](logicalPlan)
-    sparkSession.executePlan(deserialized).toRdd.mapPartitions { rows =>
+    sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { 
rows =>
       rows.map(_.get(0, objectType).asInstanceOf[T])
     }
   }
@@ -2417,19 +2417,19 @@ class Dataset[T] private[sql](
   /**
    * Converts a JavaRDD to a PythonRDD.
    */
-  protected[sql] def javaToPython: JavaRDD[Array[Byte]] = {
+  private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
     val structType = schema  // capture it for closure
     val rdd = queryExecution.toRdd.map(EvaluatePython.toJava(_, structType))
     EvaluatePython.javaToPython(rdd)
   }
 
-  protected[sql] def collectToPython(): Int = {
+  private[sql] def collectToPython(): Int = {
     withNewExecutionId {
       PythonRDD.collectAndServe(javaToPython.rdd)
     }
   }
 
-  protected[sql] def toPythonIterator(): Int = {
+  private[sql] def toPythonIterator(): Int = {
     withNewExecutionId {
       PythonRDD.toLocalIteratorAndServe(javaToPython.rdd)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 7013e31..b17fb8a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -83,12 +83,9 @@ class SQLContext private[sql](
 
   // TODO: move this logic into SparkSession
 
-  protected[sql] def sessionState: SessionState = sparkSession.sessionState
-  protected[sql] def sharedState: SharedState = sparkSession.sharedState
-  protected[sql] def conf: SQLConf = sessionState.conf
-  protected[sql] def runtimeConf: RuntimeConfig = sparkSession.conf
-  protected[sql] def cacheManager: CacheManager = sparkSession.cacheManager
-  protected[sql] def externalCatalog: ExternalCatalog = 
sparkSession.externalCatalog
+  private[sql] def sessionState: SessionState = sparkSession.sessionState
+  private[sql] def sharedState: SharedState = sparkSession.sharedState
+  private[sql] def conf: SQLConf = sessionState.conf
 
   def sparkContext: SparkContext = sparkSession.sparkContext
 
@@ -167,14 +164,6 @@ class SQLContext private[sql](
     sparkSession.conf.getAll
   }
 
-  protected[sql] def parseSql(sql: String): LogicalPlan = 
sparkSession.parseSql(sql)
-
-  protected[sql] def executeSql(sql: String): QueryExecution = 
sparkSession.executeSql(sql)
-
-  protected[sql] def executePlan(plan: LogicalPlan): QueryExecution = {
-    sparkSession.executePlan(plan)
-  }
-
   /**
    * :: Experimental ::
    * A collection of methods that are considered experimental, but can be used 
to hook into
@@ -241,15 +230,6 @@ class SQLContext private[sql](
   }
 
   /**
-   * Returns true if the [[Dataset]] is currently cached in-memory.
-   * @group cachemgmt
-   * @since 1.3.0
-   */
-  private[sql] def isCached(qName: Dataset[_]): Boolean = {
-    sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
-  }
-
-  /**
    * Caches the specified table in-memory.
    * @group cachemgmt
    * @since 1.3.0
@@ -718,26 +698,9 @@ class SQLContext private[sql](
    * have the same format as the one generated by `toString` in scala.
    * It is only used by PySpark.
    */
-  protected[sql] def parseDataType(dataTypeString: String): DataType = {
-    sparkSession.parseDataType(dataTypeString)
-  }
-
-  /**
-   * Apply a schema defined by the schemaString to an RDD. It is only used by 
PySpark.
-   */
-  protected[sql] def applySchemaToPythonRDD(
-      rdd: RDD[Array[Any]],
-      schemaString: String): DataFrame = {
-    sparkSession.applySchemaToPythonRDD(rdd, schemaString)
-  }
-
-  /**
-   * Apply a schema defined by the schema to an RDD. It is only used by 
PySpark.
-   */
-  protected[sql] def applySchemaToPythonRDD(
-      rdd: RDD[Array[Any]],
-      schema: StructType): DataFrame = {
-    sparkSession.applySchemaToPythonRDD(rdd, schema)
+  // TODO: Remove this function (would require updating PySpark).
+  private[sql] def parseDataType(dataTypeString: String): DataType = {
+    DataType.fromJson(dataTypeString)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 86c97b9..a36368a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -34,10 +34,9 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalog.Catalog
 import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, Range}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.ui.SQLListener
@@ -51,7 +50,14 @@ import org.apache.spark.util.Utils
 /**
  * The entry point to programming Spark with the Dataset and DataFrame API.
  *
- * To create a SparkSession, use the following builder pattern:
+ * In environments that this has been created upfront (e.g. REPL, notebooks), 
use the builder
+ * to get an existing session:
+ *
+ * {{{
+ *   SparkSession.builder().getOrCreate()
+ * }}}
+ *
+ * The builder can also be used to create a new session:
  *
  * {{{
  *   SparkSession.builder()
@@ -81,7 +87,7 @@ class SparkSession private(
    * and a catalog that interacts with external systems.
    */
   @transient
-  protected[sql] lazy val sharedState: SharedState = {
+  private[sql] lazy val sharedState: SharedState = {
     existingSharedState.getOrElse(
       SparkSession.reflect[SharedState, SparkContext](
         SparkSession.sharedStateClassName(sparkContext.conf),
@@ -93,7 +99,7 @@ class SparkSession private(
    * functions, and everything else that accepts a 
[[org.apache.spark.sql.internal.SQLConf]].
    */
   @transient
-  protected[sql] lazy val sessionState: SessionState = {
+  private[sql] lazy val sessionState: SessionState = {
     SparkSession.reflect[SessionState, SparkSession](
       SparkSession.sessionStateClassName(sparkContext.conf),
       self)
@@ -105,10 +111,6 @@ class SparkSession private(
   @transient
   private[sql] val sqlContext: SQLContext = new SQLContext(this)
 
-  protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
-  protected[sql] def listener: SQLListener = sharedState.listener
-  protected[sql] def externalCatalog: ExternalCatalog = 
sharedState.externalCatalog
-
   /**
    * Runtime configuration interface for Spark.
    *
@@ -178,12 +180,14 @@ class SparkSession private(
   def udf: UDFRegistration = sessionState.udf
 
   /**
+   * :: Experimental ::
    * Returns a [[ContinuousQueryManager]] that allows managing all the
    * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on 
`this`.
    *
    * @group basic
    * @since 2.0.0
    */
+  @Experimental
   def streams: ContinuousQueryManager = sessionState.continuousQueryManager
 
   /**
@@ -208,13 +212,11 @@ class SparkSession private(
    * --------------------------------- */
 
   /**
-   * :: Experimental ::
    * Returns a [[DataFrame]] with no rows or columns.
    *
    * @group dataframes
    * @since 2.0.0
    */
-  @Experimental
   @transient
   lazy val emptyDataFrame: DataFrame = {
     createDataFrame(sparkContext.emptyRDD[Row], StructType(Nil))
@@ -449,7 +451,7 @@ class SparkSession private(
    * Creates a [[DataFrame]] from an RDD[Row].
    * User can specify whether the input rows should be converted to Catalyst 
rows.
    */
-  protected[sql] def internalCreateDataFrame(
+  private[sql] def internalCreateDataFrame(
       catalystRows: RDD[InternalRow],
       schema: StructType): DataFrame = {
     // TODO: use MutableProjection when rowRDD is another DataFrame and the 
applied
@@ -462,7 +464,7 @@ class SparkSession private(
    * Creates a [[DataFrame]] from an RDD[Row].
    * User can specify whether the input rows should be converted to Catalyst 
rows.
    */
-  protected[sql] def createDataFrame(
+  private[sql] def createDataFrame(
       rowRDD: RDD[Row],
       schema: StructType,
       needsConversion: Boolean) = {
@@ -502,7 +504,7 @@ class SparkSession private(
     table(sessionState.sqlParser.parseTableIdentifier(tableName))
   }
 
-  protected[sql] def table(tableIdent: TableIdentifier): DataFrame = {
+  private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
     Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
   }
 
@@ -510,7 +512,7 @@ class SparkSession private(
    * Creates a temporary view with a DataFrame. The lifetime of this temporary 
view is tied to
    * this [[SparkSession]].
    */
-  protected[sql] def createTempView(
+  private[sql] def createTempView(
       viewName: String, df: DataFrame, replaceIfExists: Boolean) = {
     sessionState.catalog.createTempView(
       sessionState.sqlParser.parseTableIdentifier(viewName).table,
@@ -529,11 +531,10 @@ class SparkSession private(
    * @since 2.0.0
    */
   def sql(sqlText: String): DataFrame = {
-    Dataset.ofRows(self, parseSql(sqlText))
+    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
   }
 
   /**
-   * :: Experimental ::
    * Returns a [[DataFrameReader]] that can be used to read data and streams 
in as a [[DataFrame]].
    * {{{
    *   sparkSession.read.parquet("/path/to/file.parquet")
@@ -543,7 +544,6 @@ class SparkSession private(
    * @group genericdata
    * @since 2.0.0
    */
-  @Experimental
   def read: DataFrameReader = new DataFrameReader(self)
 
 
@@ -577,18 +577,6 @@ class SparkSession private(
     sparkContext.stop()
   }
 
-  protected[sql] def parseSql(sql: String): LogicalPlan = {
-    sessionState.sqlParser.parsePlan(sql)
-  }
-
-  protected[sql] def executeSql(sql: String): QueryExecution = {
-    executePlan(parseSql(sql))
-  }
-
-  protected[sql] def executePlan(plan: LogicalPlan): QueryExecution = {
-    sessionState.executePlan(plan)
-  }
-
   /**
    * Parses the data type in our internal string representation. The data type 
string should
    * have the same format as the one generated by `toString` in scala.
@@ -601,17 +589,17 @@ class SparkSession private(
   /**
    * Apply a schema defined by the schemaString to an RDD. It is only used by 
PySpark.
    */
-  protected[sql] def applySchemaToPythonRDD(
+  private[sql] def applySchemaToPythonRDD(
       rdd: RDD[Array[Any]],
       schemaString: String): DataFrame = {
-    val schema = parseDataType(schemaString).asInstanceOf[StructType]
+    val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
     applySchemaToPythonRDD(rdd, schema)
   }
 
   /**
    * Apply a schema defined by the schema to an RDD. It is only used by 
PySpark.
    */
-  protected[sql] def applySchemaToPythonRDD(
+  private[sql] def applySchemaToPythonRDD(
       rdd: RDD[Array[Any]],
       schema: StructType): DataFrame = {
     val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, 
schema).asInstanceOf[InternalRow])

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index f601138..c8bdb0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -95,7 +95,7 @@ private[sql] class CacheManager extends Logging {
             sparkSession.sessionState.conf.useCompression,
             sparkSession.sessionState.conf.columnBatchSize,
             storageLevel,
-            sparkSession.executePlan(planToCache).executedPlan,
+            sparkSession.sessionState.executePlan(planToCache).executedPlan,
             tableName))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 34187b9..330459c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -67,7 +67,7 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
   lazy val withCachedData: LogicalPlan = {
     assertAnalyzed()
     assertSupported()
-    sparkSession.cacheManager.useCachedData(analyzed)
+    sparkSession.sharedState.cacheManager.useCachedData(analyzed)
   }
 
   lazy val optimizedPlan: LogicalPlan = 
sparkSession.sessionState.optimizer.execute(withCachedData)

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index d5aaccc..642a95a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -98,7 +98,7 @@ case class ExplainCommand(
 
   // Run through the optimizer to generate the physical plan.
   override def run(sparkSession: SparkSession): Seq[Row] = try {
-    val queryExecution = sparkSession.executePlan(logicalPlan)
+    val queryExecution = sparkSession.sessionState.executePlan(logicalPlan)
     val outputString =
       if (codegen) {
         codegenString(queryExecution.executedPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index ffea628..7ce7bb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -200,7 +200,8 @@ case class DropTableCommand(
         case _ =>
       })
       try {
-        
sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(tableName.quotedString))
+        sparkSession.sharedState.cacheManager.tryUncacheQuery(
+          sparkSession.table(tableName.quotedString))
       } catch {
         case NonFatal(e) => log.warn(e.toString, e)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 075849a..8499011 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -79,7 +79,7 @@ case class CreateViewCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     // If the plan cannot be analyzed, throw an exception and don't proceed.
-    val qe = sparkSession.executePlan(child)
+    val qe = sparkSession.sessionState.executePlan(child)
     qe.assertAnalyzed()
     val analyzedPlan = qe.analyzed
 
@@ -132,7 +132,7 @@ case class CreateViewCommand(
         val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
           case (attr, col) => Alias(attr, col.name)()
         }
-        sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed
+        sparkSession.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
       }
     }
 
@@ -153,7 +153,7 @@ case class CreateViewCommand(
             val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
               case (attr, col) => Alias(attr, col.name)()
             }
-            sparkSession.executePlan(Project(projectList, 
analyzedPlan)).analyzed
+            sparkSession.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
           }
         new SQLBuilder(logicalPlan).toSQL
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index dfe0647..b3beb6c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -476,7 +476,7 @@ case class DataSource(
             options,
             data.logicalPlan,
             mode)
-        sparkSession.executePlan(plan).toRdd
+        sparkSession.sessionState.executePlan(plan).toRdd
 
       case _ =>
         sys.error(s"${providingClass.getCanonicalName} does not allow create 
table as select.")

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index c3e07f7..25b901f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -40,7 +40,7 @@ private[sql] case class InsertIntoDataSourceCommand(
     relation.insert(df, overwrite)
 
     // Invalidate the cache.
-    sparkSession.cacheManager.invalidateCache(logicalRelation)
+    sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation)
 
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 1371abe..f3f36ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -230,7 +230,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
         bucketSpec = None,
         allowExisting = false,
         managedIfNoPath = false)
-    sparkSession.executePlan(cmd).toRdd
+    sparkSession.sessionState.executePlan(cmd).toRdd
     sparkSession.table(tableIdent)
   }
 
@@ -278,7 +278,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
         bucketSpec = None,
         allowExisting = false,
         managedIfNoPath = false)
-    sparkSession.executePlan(cmd).toRdd
+    sparkSession.sessionState.executePlan(cmd).toRdd
     sparkSession.table(tableIdent)
   }
 
@@ -291,7 +291,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * @since 2.0.0
    */
   override def dropTempView(viewName: String): Unit = {
-    sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(viewName))
+    
sparkSession.sharedState.cacheManager.tryUncacheQuery(sparkSession.table(viewName))
     sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = 
true)
   }
 
@@ -302,7 +302,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * @since 2.0.0
    */
   override def isCached(tableName: String): Boolean = {
-    
sparkSession.cacheManager.lookupCachedData(sparkSession.table(tableName)).nonEmpty
+    
sparkSession.sharedState.cacheManager.lookupCachedData(sparkSession.table(tableName)).nonEmpty
   }
 
   /**
@@ -312,7 +312,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * @since 2.0.0
    */
   override def cacheTable(tableName: String): Unit = {
-    sparkSession.cacheManager.cacheQuery(sparkSession.table(tableName), 
Some(tableName))
+    
sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName), 
Some(tableName))
   }
 
   /**
@@ -322,7 +322,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * @since 2.0.0
    */
   override def uncacheTable(tableName: String): Unit = {
-    sparkSession.cacheManager.uncacheQuery(sparkSession.table(tableName))
+    
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
   }
 
   /**
@@ -332,7 +332,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * @since 2.0.0
    */
   override def clearCache(): Unit = {
-    sparkSession.cacheManager.clearCache()
+    sparkSession.sharedState.cacheManager.clearCache()
   }
 
   /**
@@ -342,7 +342,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * @since 2.0.0
    */
   protected[sql] def isCached(qName: Dataset[_]): Boolean = {
-    sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
+    sparkSession.sharedState.cacheManager.lookupCachedData(qName).nonEmpty
   }
 
   /**
@@ -360,15 +360,15 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
     // cached version and make the new version cached lazily.
     val logicalPlan = 
sparkSession.sessionState.catalog.lookupRelation(tableIdent)
     // Use lookupCachedData directly since RefreshTable also takes 
databaseName.
-    val isCached = 
sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+    val isCached = 
sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
     if (isCached) {
       // Create a data frame to represent the table.
       // TODO: Use uncacheTable once it supports database name.
       val df = Dataset.ofRows(sparkSession, logicalPlan)
       // Uncache the logicalPlan.
-      sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
+      sparkSession.sharedState.cacheManager.tryUncacheQuery(df, blocking = 
true)
       // Cache it again.
-      sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
+      sparkSession.sharedState.cacheManager.cacheQuery(df, 
Some(tableIdent.table))
     }
   }
 
@@ -383,7 +383,7 @@ private[sql] object CatalogImpl {
     val enc = ExpressionEncoder[T]()
     val encoded = data.map(d => enc.toRow(d).copy())
     val plan = new LocalRelation(enc.schema.toAttributes, encoded)
-    val queryExecution = sparkSession.executePlan(plan)
+    val queryExecution = sparkSession.sessionState.executePlan(plan)
     new Dataset[T](sparkSession, queryExecution, enc)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index c9cc2ba..4c7bbf0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -92,7 +92,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
    * Internal catalog for managing table and database states.
    */
   lazy val catalog = new SessionCatalog(
-    sparkSession.externalCatalog,
+    sparkSession.sharedState.externalCatalog,
     functionResourceLoader,
     functionRegistry,
     conf,
@@ -161,6 +161,8 @@ private[sql] class SessionState(sparkSession: SparkSession) 
{
   //  Helper methods, partially leftover from pre-2.0 days
   // ------------------------------------------------------
 
+  def executeSql(sql: String): QueryExecution = 
executePlan(sqlParser.parsePlan(sql))
+
   def executePlan(plan: LogicalPlan): QueryExecution = new 
QueryExecution(sparkSession, plan)
 
   def invalidateTable(tableName: String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 1c96bdc..e08a9ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -79,17 +79,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
   }
 
   test("unpersist an uncached table will not raise exception") {
-    assert(None == spark.cacheManager.lookupCachedData(testData))
+    assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
     testData.unpersist(blocking = true)
-    assert(None == spark.cacheManager.lookupCachedData(testData))
+    assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
     testData.unpersist(blocking = false)
-    assert(None == spark.cacheManager.lookupCachedData(testData))
+    assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
     testData.persist()
-    assert(None != spark.cacheManager.lookupCachedData(testData))
+    assert(None != spark.sharedState.cacheManager.lookupCachedData(testData))
     testData.unpersist(blocking = true)
-    assert(None == spark.cacheManager.lookupCachedData(testData))
+    assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
     testData.unpersist(blocking = false)
-    assert(None == spark.cacheManager.lookupCachedData(testData))
+    assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
   }
 
   test("cache table as select") {
@@ -311,14 +311,14 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
     spark.catalog.cacheTable("t1")
     spark.catalog.cacheTable("t2")
     spark.catalog.clearCache()
-    assert(spark.cacheManager.isEmpty)
+    assert(spark.sharedState.cacheManager.isEmpty)
 
     sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1")
     sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2")
     spark.catalog.cacheTable("t1")
     spark.catalog.cacheTable("t2")
     sql("Clear CACHE")
-    assert(spark.cacheManager.isEmpty)
+    assert(spark.sharedState.cacheManager.isEmpty)
   }
 
   test("Clear accumulators when uncacheTable to prevent memory leaking") {

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
index fa8fa06..d5cb5e1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
@@ -104,7 +104,7 @@ class DataFramePivotSuite extends QueryTest with 
SharedSQLContext{
       // pivot with extra columns to trigger optimization
       .pivot("course", Seq("dotNET", "Java") ++ (1 to 10).map(_.toString))
       .agg(sum($"earnings"))
-    val queryExecution = spark.executePlan(df.queryExecution.logical)
+    val queryExecution = 
spark.sessionState.executePlan(df.queryExecution.logical)
     assert(queryExecution.simpleString.contains("pivotfirst"))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index 8c0906b..ac9f6c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -39,7 +39,8 @@ class DatasetCacheSuite extends QueryTest with 
SharedSQLContext {
       2, 3, 4)
     // Drop the cache.
     cached.unpersist()
-    assert(spark.cacheManager.lookupCachedData(cached).isEmpty, "The Dataset 
should not be cached.")
+    assert(spark.sharedState.cacheManager.lookupCachedData(cached).isEmpty,
+      "The Dataset should not be cached.")
   }
 
   test("persist and then rebind right encoder when join 2 datasets") {
@@ -56,10 +57,10 @@ class DatasetCacheSuite extends QueryTest with 
SharedSQLContext {
     assertCached(joined, 2)
 
     ds1.unpersist()
-    assert(spark.cacheManager.lookupCachedData(ds1).isEmpty,
+    assert(spark.sharedState.cacheManager.lookupCachedData(ds1).isEmpty,
       "The Dataset ds1 should not be cached.")
     ds2.unpersist()
-    assert(spark.cacheManager.lookupCachedData(ds2).isEmpty,
+    assert(spark.sharedState.cacheManager.lookupCachedData(ds2).isEmpty,
       "The Dataset ds2 should not be cached.")
   }
 
@@ -75,9 +76,10 @@ class DatasetCacheSuite extends QueryTest with 
SharedSQLContext {
     assertCached(agged.filter(_._1 == "b"))
 
     ds.unpersist()
-    assert(spark.cacheManager.lookupCachedData(ds).isEmpty, "The Dataset ds 
should not be cached.")
+    assert(spark.sharedState.cacheManager.lookupCachedData(ds).isEmpty,
+      "The Dataset ds should not be cached.")
     agged.unpersist()
-    assert(spark.cacheManager.lookupCachedData(agged).isEmpty,
+    assert(spark.sharedState.cacheManager.lookupCachedData(agged).isEmpty,
       "The Dataset agged should not be cached.")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 5583673..cbf4a8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -60,7 +60,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
   }
 
   test("join operator selection") {
-    spark.cacheManager.clearCache()
+    spark.sharedState.cacheManager.clearCache()
 
     withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0",
       SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
@@ -113,7 +113,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 //  }
 
   test("broadcasted hash join operator selection") {
-    spark.cacheManager.clearCache()
+    spark.sharedState.cacheManager.clearCache()
     sql("CACHE TABLE testData")
     Seq(
       ("SELECT * FROM testData join testData2 ON key = a",
@@ -127,7 +127,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
   }
 
   test("broadcasted hash outer join operator selection") {
-    spark.cacheManager.clearCache()
+    spark.sharedState.cacheManager.clearCache()
     sql("CACHE TABLE testData")
     sql("CACHE TABLE testData2")
     Seq(
@@ -450,7 +450,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
   }
 
   test("broadcasted existence join operator selection") {
-    spark.cacheManager.clearCache()
+    spark.sharedState.cacheManager.clearCache()
     sql("CACHE TABLE testData")
 
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> 
Long.MaxValue.toString) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index e2fb913..af3ed14 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -33,7 +33,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
   setupTestData()
 
   test("simple columnar query") {
-    val plan = spark.executePlan(testData.logicalPlan).sparkPlan
+    val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
     val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, 
None)
 
     checkAnswer(scan, testData.collect().toSeq)
@@ -50,7 +50,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("projection") {
-    val plan = spark.executePlan(testData.select('value, 
'key).logicalPlan).sparkPlan
+    val plan = spark.sessionState.executePlan(testData.select('value, 
'key).logicalPlan).sparkPlan
     val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, 
None)
 
     checkAnswer(scan, testData.collect().map {
@@ -59,7 +59,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("SPARK-1436 regression: in-memory columns must be able to be accessed 
multiple times") {
-    val plan = spark.executePlan(testData.logicalPlan).sparkPlan
+    val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
     val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, 
None)
 
     checkAnswer(scan, testData.collect().toSeq)
@@ -202,7 +202,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
     cached.count()
 
     // Make sure, the DataFrame is indeed cached.
-    assert(spark.cacheManager.lookupCachedData(cached).nonEmpty)
+    assert(spark.sharedState.cacheManager.lookupCachedData(cached).nonEmpty)
 
     // Check result.
     checkAnswer(

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 12940c8..7e9160f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -71,21 +71,22 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
       df: DataFrame,
       expectedNumOfJobs: Int,
       expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
-    val previousExecutionIds = spark.listener.executionIdToData.keySet
+    val previousExecutionIds = 
spark.sharedState.listener.executionIdToData.keySet
     withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
       df.collect()
     }
     sparkContext.listenerBus.waitUntilEmpty(10000)
-    val executionIds = 
spark.listener.executionIdToData.keySet.diff(previousExecutionIds)
+    val executionIds =
+      
spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
     assert(executionIds.size === 1)
     val executionId = executionIds.head
-    val jobs = spark.listener.getExecution(executionId).get.jobs
+    val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs
     // Use "<=" because there is a race condition that we may miss some jobs
     // TODO Change it to "=" once we fix the race condition that missing the 
JobStarted event.
     assert(jobs.size <= expectedNumOfJobs)
     if (jobs.size == expectedNumOfJobs) {
       // If we can track all jobs, check the metric values
-      val metricValues = spark.listener.getExecutionMetrics(executionId)
+      val metricValues = 
spark.sharedState.listener.getExecutionMetrics(executionId)
       val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
         df.queryExecution.executedPlan)).allNodes.filter { node =>
         expectedMetrics.contains(node.id)
@@ -283,19 +284,20 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
 
   test("save metrics") {
     withTempPath { file =>
-      val previousExecutionIds = spark.listener.executionIdToData.keySet
+      val previousExecutionIds = 
spark.sharedState.listener.executionIdToData.keySet
       // Assume the execution plan is
       // PhysicalRDD(nodeId = 0)
       person.select('name).write.format("json").save(file.getAbsolutePath)
       sparkContext.listenerBus.waitUntilEmpty(10000)
-      val executionIds = 
spark.listener.executionIdToData.keySet.diff(previousExecutionIds)
+      val executionIds =
+        
spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
       assert(executionIds.size === 1)
       val executionId = executionIds.head
-      val jobs = spark.listener.getExecution(executionId).get.jobs
+      val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs
       // Use "<=" because there is a race condition that we may miss some jobs
       // TODO Change "<=" to "=" once we fix the race condition that missing 
the JobStarted event.
       assert(jobs.size <= 1)
-      val metricValues = spark.listener.getExecutionMetrics(executionId)
+      val metricValues = 
spark.sharedState.listener.getExecutionMetrics(executionId)
       // Because "save" will create a new DataFrame internally, we cannot get 
the real metric id.
       // However, we still can check the value.
       assert(metricValues.values.toSeq.exists(_ === "2"))

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 2374ffa..cf7e976 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -340,16 +340,16 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
   }
 
   test("SPARK-11126: no memory leak when running non SQL jobs") {
-    val previousStageNumber = spark.listener.stageIdToStageMetrics.size
+    val previousStageNumber = 
spark.sharedState.listener.stageIdToStageMetrics.size
     spark.sparkContext.parallelize(1 to 10).foreach(i => ())
     spark.sparkContext.listenerBus.waitUntilEmpty(10000)
     // listener should ignore the non SQL stage
-    assert(spark.listener.stageIdToStageMetrics.size == previousStageNumber)
+    assert(spark.sharedState.listener.stageIdToStageMetrics.size == 
previousStageNumber)
 
     spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
     spark.sparkContext.listenerBus.waitUntilEmpty(10000)
     // listener should save the SQL stage
-    assert(spark.listener.stageIdToStageMetrics.size == previousStageNumber + 
1)
+    assert(spark.sharedState.listener.stageIdToStageMetrics.size == 
previousStageNumber + 1)
   }
 
   test("SPARK-13055: history listener only tracks SQL metrics") {
@@ -418,12 +418,12 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
           }
         }
         sc.listenerBus.waitUntilEmpty(10000)
-        assert(spark.listener.getCompletedExecutions.size <= 50)
-        assert(spark.listener.getFailedExecutions.size <= 50)
+        assert(spark.sharedState.listener.getCompletedExecutions.size <= 50)
+        assert(spark.sharedState.listener.getFailedExecutions.size <= 50)
         // 50 for successful executions and 50 for failed executions
-        assert(spark.listener.executionIdToData.size <= 100)
-        assert(spark.listener.jobIdToExecutionId.size <= 100)
-        assert(spark.listener.stageIdToStageMetrics.size <= 100)
+        assert(spark.sharedState.listener.executionIdToData.size <= 100)
+        assert(spark.sharedState.listener.jobIdToExecutionId.size <= 100)
+        assert(spark.sharedState.listener.stageIdToStageMetrics.size <= 100)
       } finally {
         sc.stop()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 9c9abfe..abb7918 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -639,7 +639,7 @@ class JDBCSuite extends SparkFunSuite
   test("test credentials in the properties are not in plan output") {
     val df = sql("SELECT * FROM parts")
     val explain = ExplainCommand(df.queryExecution.logical, extended = true)
-    spark.executePlan(explain).executedPlan.executeCollect().foreach {
+    
spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
       r => assert(!List("testPass", "testUser").exists(r.toString.contains))
     }
     // test the JdbcRelation toString output
@@ -651,7 +651,7 @@ class JDBCSuite extends SparkFunSuite
   test("test credentials in the connection url are not in the plan output") {
     val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties)
     val explain = ExplainCommand(df.queryExecution.logical, extended = true)
-    spark.executePlan(explain).executedPlan.executeCollect().foreach {
+    
spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
       r => assert(!List("testPass", "testUser").exists(r.toString.contains))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index c24e474..0d5dc7a 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -59,7 +59,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = 
SparkSQLEnv.sqlCont
     // TODO unify the error code
     try {
       context.sparkContext.setJobDescription(command)
-      val execution = context.executePlan(context.sql(command).logicalPlan)
+      val execution = 
context.sessionState.executePlan(context.sql(command).logicalPlan)
       hiveResponse = execution.hiveResultString()
       tableSchema = getResultSetSchema(execution)
       new CommandProcessorResponse(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b377a20..ea721e4 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -177,8 +177,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
       alias match {
         // because hive use things like `_c0` to build the expanded text
         // currently we cannot support view from "create view v1(c1) as ..."
-        case None => SubqueryAlias(table.identifier.table, 
sparkSession.parseSql(viewText))
-        case Some(aliasText) => SubqueryAlias(aliasText, 
sparkSession.parseSql(viewText))
+        case None =>
+          SubqueryAlias(table.identifier.table,
+            sparkSession.sessionState.sqlParser.parsePlan(viewText))
+        case Some(aliasText) =>
+          SubqueryAlias(aliasText, sessionState.sqlParser.parsePlan(viewText))
       }
     } else {
       MetastoreRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
index 3fc9009..cfe6149 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
@@ -86,7 +86,7 @@ case class CreateTableAsSelectCommand(
         throw new AnalysisException(s"$tableIdentifier already exists.")
       }
     } else {
-      sparkSession.executePlan(InsertIntoTable(
+      sparkSession.sessionState.executePlan(InsertIntoTable(
         metastoreRelation, Map(), query, overwrite = true, ifNotExists = 
false)).toRdd
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3805674..9e8ff93 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -296,7 +296,7 @@ case class InsertIntoHiveTable(
     }
 
     // Invalidate the cache.
-    sqlContext.cacheManager.invalidateCache(table)
+    sqlContext.sharedState.cacheManager.invalidateCache(table)
 
     // It would be nice to just return the childRdd unchanged so insert 
operations could be chained,
     // however for now we return an empty list to simplify compatibility 
checks with hive, which

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
index 7c74a03..dc8f374 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
@@ -130,7 +130,7 @@ class ErrorPositionSuite extends QueryTest with 
TestHiveSingleton with BeforeAnd
    * @param token a unique token in the string that should be indicated by the 
exception
    */
   def positionTest(name: String, query: String, token: String): Unit = {
-    def ast = hiveContext.parseSql(query)
+    def ast = hiveContext.sessionState.sqlParser.parsePlan(query)
     def parseTree = Try(quietly(ast.treeString)).getOrElse("<failed to parse>")
 
     test(name) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index dedc8f5..f789d88 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -279,13 +279,13 @@ class ShowCreateTableSuite extends QueryTest with 
SQLTestUtils with TestHiveSing
 
   private def checkCreateTableOrView(table: TableIdentifier, checkType: 
String): Unit = {
     val db = table.database.getOrElse("default")
-    val expected = spark.externalCatalog.getTable(db, table.table)
+    val expected = spark.sharedState.externalCatalog.getTable(db, table.table)
     val shownDDL = sql(s"SHOW CREATE TABLE 
${table.quotedString}").head().getString(0)
     sql(s"DROP $checkType ${table.quotedString}")
 
     try {
       sql(shownDDL)
-      val actual = spark.externalCatalog.getTable(db, table.table)
+      val actual = spark.sharedState.externalCatalog.getTable(db, table.table)
       checkCatalogTables(expected, actual)
     } finally {
       sql(s"DROP $checkType IF EXISTS ${table.table}")

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index f8e00a3..73b1a78 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -34,7 +34,7 @@ class StatisticsSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils
 
   test("parse analyze commands") {
     def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
-      val parsed = hiveContext.parseSql(analyzeCommand)
+      val parsed = hiveContext.sessionState.sqlParser.parsePlan(analyzeCommand)
       val operators = parsed.collect {
         case a: AnalyzeTableCommand => a
         case o => o

http://git-wip-us.apache.org/repos/asf/spark/blob/0f61d6ef/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
index f5cd73d..1583a44 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
@@ -30,9 +30,9 @@ class ConcurrentHiveSuite extends SparkFunSuite with 
BeforeAndAfterAll {
         conf.set("spark.ui.enabled", "false")
         val ts =
           new TestHiveContext(new SparkContext("local", s"TestSQLContext$i", 
conf))
-        ts.executeSql("SHOW TABLES").toRdd.collect()
-        ts.executeSql("SELECT * FROM src").toRdd.collect()
-        ts.executeSql("SHOW TABLES").toRdd.collect()
+        ts.sessionState.executeSql("SHOW TABLES").toRdd.collect()
+        ts.sessionState.executeSql("SELECT * FROM src").toRdd.collect()
+        ts.sessionState.executeSql("SHOW TABLES").toRdd.collect()
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to