Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4173#discussion_r23556987
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---
    @@ -0,0 +1,563 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.sql
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +
    +import com.fasterxml.jackson.core.JsonFactory
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.sql.catalyst.ScalaReflection
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
    +import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.LogicalRDD
    +import org.apache.spark.sql.json.JsonRDD
    +import org.apache.spark.sql.types.{NumericType, StructType}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * A collection of rows that have the same columns.
    + *
    + * A [[DataFrame]] is equivalent to a relational table in Spark SQL, and 
can be created using
    + * various functions in [[SQLContext]].
    + * {{{
    + *   val people = sqlContext.parquetFile("...")
    + * }}}
    + *
    + * Once created, it can be manipulated using the various 
domain-specific-language (DSL) functions
    + * defined in: [[DataFrame]] (this class), [[Column]], and [[dsl]] for 
Scala DSL.
    + *
    + * To select a column from the data frame, use the apply method:
    + * {{{
    + *   val ageCol = people("age")  // in Scala
    + *   Column ageCol = people.apply("age")  // in Java
    + * }}}
    + *
    + * Note that the [[Column]] type can also be manipulated through its 
various functions.
    + * {{
    + *   // The following creates a new column that increases everybody's age 
by 10.
    + *   people("age") + 10  // in Scala
    + * }}
    + *
    + * A more concrete example:
    + * {{{
    + *   // To create DataFrame using SQLContext
    + *   val people = sqlContext.parquetFile("...")
    + *   val department = sqlContext.parquetFile("...")
    + *
    + *   people.filter("age" > 30)
    + *     .join(department, people("deptId") === department("id"))
    + *     .groupby(department("name"), "gender")
    + *     .agg(avg(people("salary")), max(people("age")))
    + * }}}
    + */
    +// TODO: Improve documentation.
    +class DataFrame protected[sql](
    +    val sqlContext: SQLContext,
    +    private val baseLogicalPlan: LogicalPlan,
    +    operatorsEnabled: Boolean)
    +  extends DataFrameSpecificApi with RDDApi[Row] {
    +
    +  protected[sql] def this(sqlContext: Option[SQLContext], plan: 
Option[LogicalPlan]) =
    +    this(sqlContext.orNull, plan.orNull, sqlContext.isDefined && 
plan.isDefined)
    +
    +  protected[sql] def this(sqlContext: SQLContext, plan: LogicalPlan) = 
this(sqlContext, plan, true)
    +
    +  @transient protected[sql] lazy val queryExecution = 
sqlContext.executePlan(baseLogicalPlan)
    +
    +  @transient protected[sql] val logicalPlan: LogicalPlan = baseLogicalPlan 
match {
    +    // For various commands (like DDL) and queries with side effects, we 
force query optimization to
    +    // happen right away to let these side effects take place eagerly.
    +    case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: 
WriteToFile =>
    +      LogicalRDD(queryExecution.analyzed.output, 
queryExecution.toRdd)(sqlContext)
    +    case _ =>
    +      baseLogicalPlan
    +  }
    +
    +  /**
    +   * An implicit conversion function internal to this class for us to 
avoid doing
    +   * "new DataFrame(...)" everywhere.
    +   */
    +  private[this] implicit def toDataFrame(logicalPlan: LogicalPlan): 
DataFrame = {
    +    new DataFrame(sqlContext, logicalPlan, true)
    +  }
    +
    +  /** Return the list of numeric columns, useful for doing aggregation. */
    +  protected[sql] def numericColumns: Seq[Expression] = {
    +    schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
    +      logicalPlan.resolve(n.name, sqlContext.analyzer.resolver).get
    +    }
    +  }
    +
    +  /** Resolve a column name into a Catalyst [[NamedExpression]]. */
    +  protected[sql] def resolve(colName: String): NamedExpression = {
    +    logicalPlan.resolve(colName, sqlContext.analyzer.resolver).getOrElse(
    +      throw new RuntimeException(s"""Cannot resolve column name 
"$colName""""))
    +  }
    +
    +  /** Left here for compatibility reasons. */
    +  @deprecated("1.3.0", "use toDataFrame")
    +  def toSchemaRDD: DataFrame = this
    +
    +  /**
    +   * Return the object itself. Used to force an implicit conversion from 
RDD to DataFrame in Scala.
    +   */
    +  def toDF: DataFrame = this
    +
    +  /** Return the schema of this [[DataFrame]]. */
    +  override def schema: StructType = queryExecution.analyzed.schema
    +
    +  /** Return all column names and their data types as an array. */
    +  override def dtypes: Array[(String, String)] = schema.fields.map { field 
=>
    +    (field.name, field.dataType.toString)
    +  }
    +
    +  /** Return all column names as an array. */
    +  override def columns: Array[String] = schema.fields.map(_.name)
    +
    +  /** Print the schema to the console in a nice tree format. */
    +  override def printSchema(): Unit = println(schema.treeString)
    +
    +  /**
    +   * Cartesian join with another [[DataFrame]].
    +   *
    +   * Note that cartesian joins are very expensive without an extra filter 
that can be pushed down.
    +   *
    +   * @param right Right side of the join operation.
    +   */
    +  override def join(right: DataFrame): DataFrame = {
    +    Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
    +  }
    +
    +  /**
    +   * Inner join with another [[DataFrame]], using the given join 
expression.
    +   *
    +   * {{{
    +   *   // The following two are equivalent:
    +   *   df1.join(df2, $"df1Key" === $"df2Key")
    +   *   df1.join(df2).where($"df1Key" === $"df2Key")
    +   * }}}
    +   */
    +  override def join(right: DataFrame, joinExprs: Column): DataFrame = {
    +    Join(logicalPlan, right.logicalPlan, Inner, Some(joinExprs.expr))
    +  }
    +
    +  /**
    +   * Join with another [[DataFrame]], usin  g the given join expression. 
The following performs
    +   * a full outer join between `df1` and `df2`.
    +   *
    +   * {{{
    +   *   df1.join(df2, "outer", $"df1Key" === $"df2Key")
    +   * }}}
    +   *
    +   * @param right Right side of the join.
    +   * @param joinExprs Join expression.
    +   * @param joinType One of: `inner`, `outer`, `left_outer`, 
`right_outer`, `semijoin`.
    +   */
    +  override def join(right: DataFrame, joinExprs: Column, joinType: 
String): DataFrame = {
    +    Join(logicalPlan, right.logicalPlan, JoinType(joinType), 
Some(joinExprs.expr))
    +  }
    +
    +  /**
    +   * Return a new [[DataFrame]] sorted by the specified column, in 
ascending column.
    +   * {{{
    +   *   // The following 3 are equivalent
    +   *   df.sort("sortcol")
    +   *   df.sort($"sortcol")
    +   *   df.sort($"sortcol".asc)
    +   * }}}
    +   */
    +  override def sort(colName: String): DataFrame = {
    +    Sort(Seq(SortOrder(apply(colName).expr, Ascending)), global = true, 
logicalPlan)
    +  }
    +
    +  /**
    +   * Return a new [[DataFrame]] sorted by the given expressions. For 
example:
    +   * {{{
    +   *   df.sort($"col1", $"col2".desc)
    +   * }}}
    +   */
    +  @scala.annotation.varargs
    +  override def sort(sortExpr: Column, sortExprs: Column*): DataFrame = {
    +    val sortOrder: Seq[SortOrder] = (sortExpr +: sortExprs).map { col =>
    +      col.expr match {
    +        case expr: SortOrder =>
    +          expr
    +        case expr: Expression =>
    +          SortOrder(expr, Ascending)
    +      }
    +    }
    +    Sort(sortOrder, global = true, logicalPlan)
    +  }
    +
    +  /**
    +   * Return a new [[DataFrame]] sorted by the given expressions.
    +   * This is an alias of the `sort` function.
    +   */
    +  @scala.annotation.varargs
    +  override def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame = {
    +    sort(sortExpr, sortExprs :_*)
    +  }
    +
    +  /**
    +   * Selecting a single column and return it as a [[Column]].
    +   */
    +  override def apply(colName: String): Column = {
    +    val expr = resolve(colName)
    +    new Column(Some(sqlContext), Some(Project(Seq(expr), logicalPlan)), 
expr)
    +  }
    +
    +  /**
    +   * Selecting a set of expressions, wrapped in a Product.
    +   * {{{
    +   *   // The following two are equivalent:
    +   *   df.apply(($"colA", $"colB" + 1))
    +   *   df.select($"colA", $"colB" + 1)
    +   * }}}
    +   */
    +  override def apply(projection: Product): DataFrame = {
    +    require(projection.productArity >= 1)
    +    select(projection.productIterator.map {
    +      case c: Column => c
    +      case o: Any => new Column(Some(sqlContext), None, LiteralExpr(o))
    +    }.toSeq :_*)
    +  }
    +
    +  /**
    +   * Alias the current [[DataFrame]].
    +   */
    +  override def as(name: String): DataFrame = Subquery(name, logicalPlan)
    +
    +  /**
    +   * Selecting a set of expressions.
    +   * {{{
    +   *   df.select($"colA", $"colB" + 1)
    +   * }}}
    +   */
    +  @scala.annotation.varargs
    +  override def select(cols: Column*): DataFrame = {
    +    val exprs = cols.zipWithIndex.map {
    +      case (Column(expr: NamedExpression), _) =>
    +        expr
    +      case (Column(expr: Expression), _) =>
    +        Alias(expr, expr.toString)()
    +    }
    +    Project(exprs.toSeq, logicalPlan)
    +  }
    +
    +  /**
    +   * Selecting a set of columns. This is a variant of `select` that can 
only select
    +   * existing columns using column names (i.e. cannot construct 
expressions).
    +   *
    +   * {{{
    +   *   // The following two are equivalent:
    +   *   df.select("colA", "colB")
    +   *   df.select($"colA", $"colB")
    +   * }}}
    +   */
    +  @scala.annotation.varargs
    +  override def select(col: String, cols: String*): DataFrame = {
    +    select((col +: cols).map(new Column(_)) :_*)
    +  }
    +
    +  /**
    +   * Filtering rows using the given condition.
    +   * {{{
    +   *   // The following are equivalent:
    +   *   peopleDf.filter($"age" > 15)
    +   *   peopleDf.where($"age" > 15)
    +   *   peopleDf($"age > 15)
    --- End diff --
    
    missing closing quote?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to