Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17433#discussion_r108091528
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
    @@ -179,88 +132,295 @@ private[sql] class SessionState(
       }
     }
     
    -
     private[sql] object SessionState {
    +  /**
    +   * Create a new [[SessionState]] for the given session.
    +   */
    +  def apply(session: SparkSession): SessionState = {
    +    new SessionStateBuilder(session).build
    +  }
    +
    +  def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
    +    val newHadoopConf = new Configuration(hadoopConf)
    +    sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
newHadoopConf.set(k, v) }
    +    newHadoopConf
    +  }
    +}
    +
    +/**
    + * Builder class that coordinates construction of a new [[SessionState]].
    + *
    + * The builder explicitly defines all components needed by the session 
state, and creates a session
    + * state when `build` is called. Components should only be initialized 
once. This is not a problem
    + * for most components as they are only used in the `build` function. 
However some components
    + * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & 
`sqlParser`) are as dependencies
    + * for other components and are shared as a result. These components are 
defined as lazy vals to
    + * make sure the component is created only once.
    + *
    + * A developer can modify the builder by providing custom versions of 
components, or by using the
    + * hooks provided for the analyzer, optimizer & planner. There are some 
dependencies between the
    + * components (they are documented per dependency), a developer should 
respect these when making
    + * modifications in order to prevent initialization problems.
    + *
    + * A parent [[SessionState]] can be used to initialize the new 
[[SessionState]]. The new session
    + * state will clone the parent sessions state's `conf`, 
`functionRegistry`, `experimentalMethods`
    + * and `catalog` fields. Note that the state is cloned when `build` is 
called, and not before.
    + */
    +@Experimental
    +@InterfaceStability.Unstable
    +abstract class BaseSessionStateBuilder(
    +    val session: SparkSession,
    +    val parentState: Option[SessionState] = None) {
    +  type NewBuilder = (SparkSession, Option[SessionState]) => 
BaseSessionStateBuilder
     
    -  def apply(sparkSession: SparkSession): SessionState = {
    -    apply(sparkSession, new SQLConf)
    +  /**
    +   * Extract entries from `SparkConf` and put them in the `SQLConf`
    +   */
    +  protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): 
Unit = {
    +    sparkConf.getAll.foreach { case (k, v) =>
    +      sqlConf.setConfString(k, v)
    +    }
       }
     
    -  def apply(sparkSession: SparkSession, sqlConf: SQLConf): SessionState = {
    -    val sparkContext = sparkSession.sparkContext
    +  /**
    +   * SQL-specific key-value configurations.
    +   *
    +   * These either get cloned from a pre-existing instance or newly 
created. The conf is always
    +   * merged with its [[SparkConf]].
    +   */
    +  protected lazy val conf: SQLConf = {
    +    val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf)
    +    mergeSparkConf(conf, session.sparkContext.conf)
    +    conf
    +  }
     
    -    // Automatically extract all entries and put them in our SQLConf
    -    mergeSparkConf(sqlConf, sparkContext.getConf)
    +  /**
    +   * Internal catalog managing functions registered by the user.
    +   *
    +   * This either gets cloned from a pre-existing version or cloned from 
the build-in registry.
    +   */
    +  protected lazy val functionRegistry: FunctionRegistry = {
    +    
parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
    +  }
     
    -    val functionRegistry = FunctionRegistry.builtin.clone()
    +  /**
    +   * Experimental methods that can be used to define custom optimization 
rules and custom planning
    +   * strategies.
    +   *
    +   * This either gets cloned from a pre-existing version or newly created.
    +   */
    +  protected lazy val experimentalMethods: ExperimentalMethods = {
    +    parentState.map(_.experimentalMethods.clone()).getOrElse(new 
ExperimentalMethods)
    +  }
     
    -    val sqlParser: ParserInterface = new SparkSqlParser(sqlConf)
    +  /**
    +   * Parser that extracts expressions, plans, table identifiers etc. from 
SQL texts.
    +   *
    +   * Note: this depends on the `conf` field.
    +   */
    +  protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
     
    +  /**
    +   * Catalog for managing table and database states. If there is a 
pre-existing catalog, the state
    +   * of that catalog (temp tables & current database) will be copied into 
the new catalog.
    +   *
    +   * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` 
fields.
    +   */
    +  protected lazy val catalog: SessionCatalog = {
         val catalog = new SessionCatalog(
    -      sparkSession.sharedState.externalCatalog,
    -      sparkSession.sharedState.globalTempViewManager,
    +      session.sharedState.externalCatalog,
    +      session.sharedState.globalTempViewManager,
           functionRegistry,
    -      sqlConf,
    -      newHadoopConf(sparkContext.hadoopConfiguration, sqlConf),
    -      sqlParser)
    +      conf,
    +      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, 
conf),
    +      sqlParser,
    +      new SessionFunctionResourceLoader(session))
    +    parentState.foreach(_.catalog.copyStateTo(catalog))
    +    catalog
    +  }
     
    -    val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, sqlConf)
    +  /**
    +   * Logical query plan analyzer for resolving unresolved attributes and 
relations.
    +   *
    +   * Note: this depends on the `conf` and `catalog` field.
    +   */
    +  protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
    +    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
    +      new FindDataSourceTable(session) +:
    +      new ResolveSQLOnFile(session) +:
    +      customResolutionRules
    +
    +    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
    +      PreprocessTableCreation(session) +:
    +      PreprocessTableInsertion(conf) +:
    +      DataSourceAnalysis(conf) +:
    +      customPostHocResolutionRules
    +
    +    override val extendedCheckRules: Seq[LogicalPlan => Unit] =
    +      PreWriteCheck +:
    +      HiveOnlyCheck +:
    +      customCheckRules
    +  }
     
    -    val streamingQueryManager: StreamingQueryManager = new 
StreamingQueryManager(sparkSession)
    +  /**
    +   * Custom resolution rules to add to the Analyzer. Prefer overriding 
this instead of creating
    +   * your own Analyzer.
    +   *
    +   * Note that this may NOT depend on the `analyzer` function.
    +   */
    +  protected def customResolutionRules: Seq[Rule[LogicalPlan]] = Nil
     
    -    val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(sparkSession, plan)
    +  /**
    +   * Custom post resolution rules to add to the Analyzer. Prefer 
overriding this instead of
    +   * creating your own Analyzer.
    +   *
    +   * Note that this may NOT depend on the `analyzer` function.
    +   */
    +  protected def customPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
    +
    +  /**
    +   * Custom check rules to add to the Analyzer. Prefer overriding this 
instead of creating
    +   * your own Analyzer.
    +   *
    +   * Note that this may NOT depend on the `analyzer` function.
    +   */
    +  protected def customCheckRules: Seq[LogicalPlan => Unit] = Nil
    +
    +  /**
    +   * Logical query plan optimizer.
    +   *
    +   * Note: this depends on the `conf`, `catalog` and `experimentalMethods` 
fields.
    +   */
    +  protected def optimizer: Optimizer = {
    +    new SparkOptimizer(catalog, conf, experimentalMethods) {
    +      override def extendedOperatorOptimizationRules: 
Seq[Rule[LogicalPlan]] =
    +        super.extendedOperatorOptimizationRules ++ 
customOperatorOptimizationRules
    +    }
    +  }
    +
    +  /**
    +   * Custom operator optimization rules to add to the Optimizer. Prefer 
overriding this instead
    +   * of creating your own Optimizer.
    +   *
    +   * Note that this may NOT depend on the `optimizer` function.
    +   */
    +  protected def customOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = 
Nil
     
    -    val sessionState = new SessionState(
    -      sparkContext,
    -      sparkSession.sharedState,
    -      sqlConf,
    -      new ExperimentalMethods,
    +  /**
    +   * Planner that converts optimized logical plans to physical plans.
    +   *
    +   * Note: this depends on the `conf` and `experimentalMethods` fields.
    +   */
    +  protected def planner: SparkPlanner = {
    +    new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
    +      override def extraPlanningStrategies: Seq[Strategy] =
    +        super.extraPlanningStrategies ++ customPlanningStrategies
    +    }
    +  }
    +
    +  /**
    +   * Custom strategies to add to the planner. Prefer overriding this 
instead of creating
    +   * your own Planner.
    +   *
    +   * Note that this may NOT depend on the `planner` function.
    +   */
    +  protected def customPlanningStrategies: Seq[Strategy] = Nil
    +
    +  /**
    +   * Create a query execution object.
    +   */
    +  protected def createQueryExecution: LogicalPlan => QueryExecution = { 
plan =>
    +    new QueryExecution(session, plan)
    +  }
    +
    +  /**
    +   * Interface to start and stop streaming queries.
    +   */
    +  protected def streamingQueryManager: StreamingQueryManager = new 
StreamingQueryManager(session)
    +
    +  /**
    +   * Function that produces a new instance of the SessionStateBuilder. 
This is used by the
    +   * [[SessionState]]'s clone functionality. Make sure to override this 
when implementing your own
    +   * [[SessionStateBuilder]].
    +   */
    +  protected def newBuilder: NewBuilder
    +
    +  /**
    +   * Function used to make clones of the session state.
    +   */
    +  protected def createClone: (SparkSession, SessionState) => SessionState 
= {
    +    val createBuilder = newBuilder
    +    (session, state) => createBuilder(session, Option(state)).build
    +  }
    +
    +  /**
    +   * Build the [[SessionState]].
    +   */
    +  def build: SessionState = {
    +    new SessionState(
    +      session.sparkContext,
    +      session.sharedState,
    +      conf,
    +      experimentalMethods,
           functionRegistry,
           catalog,
           sqlParser,
           analyzer,
    +      optimizer,
    +      planner,
           streamingQueryManager,
    -      queryExecutionCreator)
    -    // functionResourceLoader needs to access SessionState.addJar, so it 
cannot be created before
    -    // creating SessionState. Setting `catalog.functionResourceLoader` 
here is safe since the caller
    -    // cannot use SessionCatalog before we return SessionState.
    -    catalog.functionResourceLoader = sessionState.functionResourceLoader
    -    sessionState
    +      createQueryExecution,
    +      createClone)
       }
    +}
     
    -  def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
    -    val newHadoopConf = new Configuration(hadoopConf)
    -    sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
newHadoopConf.set(k, v) }
    -    newHadoopConf
    -  }
    +/**
    + * Concrete implementation of a [[SessionStateBuilder]].
    + */
    +@Experimental
    +@InterfaceStability.Unstable
    +class SessionStateBuilder(
    +    session: SparkSession,
    +    state: Option[SessionState] = None)
    --- End diff --
    
    Keep the original name `parentState`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to