Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4173#discussion_r23627816
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---
    @@ -0,0 +1,606 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.sql
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +import scala.collection.JavaConversions._
    +
    +import java.util.{ArrayList, List => JList}
    +
    +import com.fasterxml.jackson.core.JsonFactory
    +import net.razorvine.pickle.Pickler
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.api.java.JavaRDD
    +import org.apache.spark.api.python.SerDeUtil
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.sql.catalyst.ScalaReflection
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
    +import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
    +import org.apache.spark.sql.json.JsonRDD
    +import org.apache.spark.sql.types.{NumericType, StructType}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * A collection of rows that have the same columns.
    + *
    + * A [[DataFrame]] is equivalent to a relational table in Spark SQL, and 
can be created using
    + * various functions in [[SQLContext]].
    + * {{{
    + *   val people = sqlContext.parquetFile("...")
    + * }}}
    + *
    + * Once created, it can be manipulated using the various 
domain-specific-language (DSL) functions
    + * defined in: [[DataFrame]] (this class), [[Column]], and [[dsl]] for 
Scala DSL.
    + *
    + * To select a column from the data frame, use the apply method:
    + * {{{
    + *   val ageCol = people("age")  // in Scala
    + *   Column ageCol = people.apply("age")  // in Java
    + * }}}
    + *
    + * Note that the [[Column]] type can also be manipulated through its 
various functions.
    + * {{
    + *   // The following creates a new column that increases everybody's age 
by 10.
    + *   people("age") + 10  // in Scala
    + * }}
    + *
    + * A more concrete example:
    + * {{{
    + *   // To create DataFrame using SQLContext
    + *   val people = sqlContext.parquetFile("...")
    + *   val department = sqlContext.parquetFile("...")
    + *
    + *   people.filter("age" > 30)
    + *     .join(department, people("deptId") === department("id"))
    + *     .groupBy(department("name"), "gender")
    + *     .agg(avg(people("salary")), max(people("age")))
    + * }}}
    + */
    +// TODO: Improve documentation.
    +class DataFrame protected[sql](
    +    val sqlContext: SQLContext,
    +    private val baseLogicalPlan: LogicalPlan,
    +    operatorsEnabled: Boolean)
    +  extends DataFrameSpecificApi with RDDApi[Row] {
    +
    +  protected[sql] def this(sqlContext: Option[SQLContext], plan: 
Option[LogicalPlan]) =
    +    this(sqlContext.orNull, plan.orNull, sqlContext.isDefined && 
plan.isDefined)
    +
    +  protected[sql] def this(sqlContext: SQLContext, plan: LogicalPlan) = 
this(sqlContext, plan, true)
    +
    +  @transient protected[sql] lazy val queryExecution = 
sqlContext.executePlan(baseLogicalPlan)
    +
    +  @transient protected[sql] val logicalPlan: LogicalPlan = baseLogicalPlan 
match {
    +    // For various commands (like DDL) and queries with side effects, we 
force query optimization to
    +    // happen right away to let these side effects take place eagerly.
    +    case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: 
WriteToFile =>
    +      LogicalRDD(queryExecution.analyzed.output, 
queryExecution.toRdd)(sqlContext)
    +    case _ =>
    +      baseLogicalPlan
    +  }
    +
    +  /**
    +   * An implicit conversion function internal to this class for us to 
avoid doing
    +   * "new DataFrame(...)" everywhere.
    +   */
    +  private[this] implicit def toDataFrame(logicalPlan: LogicalPlan): 
DataFrame = {
    +    new DataFrame(sqlContext, logicalPlan, true)
    +  }
    +
    +  /** Return the list of numeric columns, useful for doing aggregation. */
    +  protected[sql] def numericColumns: Seq[Expression] = {
    +    schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
    +      logicalPlan.resolve(n.name, sqlContext.analyzer.resolver).get
    +    }
    +  }
    +
    +  /** Resolve a column name into a Catalyst [[NamedExpression]]. */
    +  protected[sql] def resolve(colName: String): NamedExpression = {
    +    logicalPlan.resolve(colName, sqlContext.analyzer.resolver).getOrElse(
    +      throw new RuntimeException(s"""Cannot resolve column name 
"$colName""""))
    +  }
    +
    +  /** Left here for compatibility reasons. */
    +  @deprecated("1.3.0", "use toDataFrame")
    +  def toSchemaRDD: DataFrame = this
    +
    +  /**
    +   * Return the object itself. Used to force an implicit conversion from 
RDD to DataFrame in Scala.
    +   */
    +  def toDF: DataFrame = this
    +
    +  /** Return the schema of this [[DataFrame]]. */
    +  override def schema: StructType = queryExecution.analyzed.schema
    --- End diff --
    
    Can we add a new higher-level type for `schema` as well ? It is painful as 
a user to dig into `StructType` etc. -- Similarly while applying a schema to an 
RDD it would be good to have a higher-level type / constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to