Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4173#discussion_r23590914
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---
    @@ -0,0 +1,598 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.sql
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +import scala.collection.JavaConversions._
    +
    +import java.util.{ArrayList, List => JList}
    +
    +import com.fasterxml.jackson.core.JsonFactory
    +import net.razorvine.pickle.Pickler
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.api.java.JavaRDD
    +import org.apache.spark.api.python.SerDeUtil
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.sql.catalyst.ScalaReflection
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
    +import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
    +import org.apache.spark.sql.json.JsonRDD
    +import org.apache.spark.sql.types.{NumericType, StructType}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * A collection of rows that have the same columns.
    + *
    + * A [[DataFrame]] is equivalent to a relational table in Spark SQL, and 
can be created using
    + * various functions in [[SQLContext]].
    + * {{{
    + *   val people = sqlContext.parquetFile("...")
    + * }}}
    + *
    + * Once created, it can be manipulated using the various 
domain-specific-language (DSL) functions
    + * defined in: [[DataFrame]] (this class), [[Column]], and [[dsl]] for 
Scala DSL.
    + *
    + * To select a column from the data frame, use the apply method:
    + * {{{
    + *   val ageCol = people("age")  // in Scala
    + *   Column ageCol = people.apply("age")  // in Java
    + * }}}
    + *
    + * Note that the [[Column]] type can also be manipulated through its 
various functions.
    + * {{
    + *   // The following creates a new column that increases everybody's age 
by 10.
    + *   people("age") + 10  // in Scala
    + * }}
    + *
    + * A more concrete example:
    + * {{{
    + *   // To create DataFrame using SQLContext
    + *   val people = sqlContext.parquetFile("...")
    + *   val department = sqlContext.parquetFile("...")
    + *
    + *   people.filter("age" > 30)
    + *     .join(department, people("deptId") === department("id"))
    + *     .groupby(department("name"), "gender")
    + *     .agg(avg(people("salary")), max(people("age")))
    + * }}}
    + */
    +// TODO: Improve documentation.
    +class DataFrame protected[sql](
    +    val sqlContext: SQLContext,
    +    private val baseLogicalPlan: LogicalPlan,
    +    operatorsEnabled: Boolean)
    +  extends DataFrameSpecificApi with RDDApi[Row] {
    +
    +  protected[sql] def this(sqlContext: Option[SQLContext], plan: 
Option[LogicalPlan]) =
    +    this(sqlContext.orNull, plan.orNull, sqlContext.isDefined && 
plan.isDefined)
    +
    +  protected[sql] def this(sqlContext: SQLContext, plan: LogicalPlan) = 
this(sqlContext, plan, true)
    +
    +  @transient protected[sql] lazy val queryExecution = 
sqlContext.executePlan(baseLogicalPlan)
    +
    +  @transient protected[sql] val logicalPlan: LogicalPlan = baseLogicalPlan 
match {
    +    // For various commands (like DDL) and queries with side effects, we 
force query optimization to
    +    // happen right away to let these side effects take place eagerly.
    +    case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: 
WriteToFile =>
    +      LogicalRDD(queryExecution.analyzed.output, 
queryExecution.toRdd)(sqlContext)
    +    case _ =>
    +      baseLogicalPlan
    +  }
    +
    +  /**
    +   * An implicit conversion function internal to this class for us to 
avoid doing
    +   * "new DataFrame(...)" everywhere.
    +   */
    +  private[this] implicit def toDataFrame(logicalPlan: LogicalPlan): 
DataFrame = {
    +    new DataFrame(sqlContext, logicalPlan, true)
    +  }
    +
    +  /** Return the list of numeric columns, useful for doing aggregation. */
    +  protected[sql] def numericColumns: Seq[Expression] = {
    +    schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
    +      logicalPlan.resolve(n.name, sqlContext.analyzer.resolver).get
    +    }
    +  }
    +
    +  /** Resolve a column name into a Catalyst [[NamedExpression]]. */
    +  protected[sql] def resolve(colName: String): NamedExpression = {
    +    logicalPlan.resolve(colName, sqlContext.analyzer.resolver).getOrElse(
    +      throw new RuntimeException(s"""Cannot resolve column name 
"$colName""""))
    +  }
    +
    +  /** Left here for compatibility reasons. */
    +  @deprecated("1.3.0", "use toDataFrame")
    +  def toSchemaRDD: DataFrame = this
    +
    +  /**
    +   * Return the object itself. Used to force an implicit conversion from 
RDD to DataFrame in Scala.
    +   */
    +  def toDF: DataFrame = this
    +
    +  /** Return the schema of this [[DataFrame]]. */
    +  override def schema: StructType = queryExecution.analyzed.schema
    +
    +  /** Return all column names and their data types as an array. */
    +  override def dtypes: Array[(String, String)] = schema.fields.map { field 
=>
    +    (field.name, field.dataType.toString)
    +  }
    +
    +  /** Return all column names as an array. */
    +  override def columns: Array[String] = schema.fields.map(_.name)
    +
    +  /** Print the schema to the console in a nice tree format. */
    +  override def printSchema(): Unit = println(schema.treeString)
    +
    +  /**
    +   * Cartesian join with another [[DataFrame]].
    +   *
    +   * Note that cartesian joins are very expensive without an extra filter 
that can be pushed down.
    +   *
    +   * @param right Right side of the join operation.
    +   */
    +  override def join(right: DataFrame): DataFrame = {
    +    Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
    +  }
    +
    +  /**
    +   * Inner join with another [[DataFrame]], using the given join 
expression.
    +   *
    +   * {{{
    +   *   // The following two are equivalent:
    +   *   df1.join(df2, $"df1Key" === $"df2Key")
    +   *   df1.join(df2).where($"df1Key" === $"df2Key")
    +   * }}}
    +   */
    +  override def join(right: DataFrame, joinExprs: Column): DataFrame = {
    +    Join(logicalPlan, right.logicalPlan, Inner, Some(joinExprs.expr))
    +  }
    +
    +  /**
    +   * Join with another [[DataFrame]], usin  g the given join expression. 
The following performs
    +   * a full outer join between `df1` and `df2`.
    +   *
    +   * {{{
    +   *   df1.join(df2, "outer", $"df1Key" === $"df2Key")
    +   * }}}
    +   *
    +   * @param right Right side of the join.
    +   * @param joinExprs Join expression.
    +   * @param joinType One of: `inner`, `outer`, `left_outer`, 
`right_outer`, `semijoin`.
    +   */
    +  override def join(right: DataFrame, joinExprs: Column, joinType: 
String): DataFrame = {
    +    Join(logicalPlan, right.logicalPlan, JoinType(joinType), 
Some(joinExprs.expr))
    +  }
    +
    +  /**
    +   * Return a new [[DataFrame]] sorted by the specified column, in 
ascending column.
    +   * {{{
    +   *   // The following 3 are equivalent
    +   *   df.sort("sortcol")
    +   *   df.sort($"sortcol")
    +   *   df.sort($"sortcol".asc)
    +   * }}}
    +   */
    +  override def sort(colName: String): DataFrame = {
    +    Sort(Seq(SortOrder(apply(colName).expr, Ascending)), global = true, 
logicalPlan)
    +  }
    +
    +  /**
    +   * Return a new [[DataFrame]] sorted by the given expressions. For 
example:
    +   * {{{
    +   *   df.sort($"col1", $"col2".desc)
    +   * }}}
    +   */
    +  @scala.annotation.varargs
    +  override def sort(sortExpr: Column, sortExprs: Column*): DataFrame = {
    +    val sortOrder: Seq[SortOrder] = (sortExpr +: sortExprs).map { col =>
    +      col.expr match {
    +        case expr: SortOrder =>
    +          expr
    +        case expr: Expression =>
    +          SortOrder(expr, Ascending)
    +      }
    +    }
    +    Sort(sortOrder, global = true, logicalPlan)
    +  }
    +
    +  /**
    +   * Return a new [[DataFrame]] sorted by the given expressions.
    +   * This is an alias of the `sort` function.
    +   */
    +  @scala.annotation.varargs
    +  override def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame = {
    +    sort(sortExpr, sortExprs :_*)
    +  }
    +
    +  /**
    +   * Selecting a single column and return it as a [[Column]].
    +   */
    +  override def apply(colName: String): Column = {
    +    val expr = resolve(colName)
    +    new Column(Some(sqlContext), Some(Project(Seq(expr), logicalPlan)), 
expr)
    +  }
    +
    +  /**
    +   * Selecting a set of expressions, wrapped in a Product.
    +   * {{{
    +   *   // The following two are equivalent:
    +   *   df.apply(($"colA", $"colB" + 1))
    +   *   df.select($"colA", $"colB" + 1)
    +   * }}}
    +   */
    +  override def apply(projection: Product): DataFrame = {
    +    require(projection.productArity >= 1)
    +    select(projection.productIterator.map {
    +      case c: Column => c
    +      case o: Any => new Column(Some(sqlContext), None, LiteralExpr(o))
    +    }.toSeq :_*)
    +  }
    +
    +  /**
    +   * Alias the current [[DataFrame]].
    +   */
    +  override def as(name: String): DataFrame = Subquery(name, logicalPlan)
    +
    +  /**
    +   * Selecting a set of expressions.
    +   * {{{
    +   *   df.select($"colA", $"colB" + 1)
    +   * }}}
    +   */
    +  @scala.annotation.varargs
    +  override def select(cols: Column*): DataFrame = {
    +    val exprs = cols.zipWithIndex.map {
    +      case (Column(expr: NamedExpression), _) =>
    +        expr
    +      case (Column(expr: Expression), _) =>
    +        Alias(expr, expr.toString)()
    +    }
    +    Project(exprs.toSeq, logicalPlan)
    +  }
    +
    +  /**
    +   * Selecting a set of columns. This is a variant of `select` that can 
only select
    +   * existing columns using column names (i.e. cannot construct 
expressions).
    +   *
    +   * {{{
    +   *   // The following two are equivalent:
    +   *   df.select("colA", "colB")
    +   *   df.select($"colA", $"colB")
    +   * }}}
    +   */
    +  @scala.annotation.varargs
    +  override def select(col: String, cols: String*): DataFrame = {
    +    select((col +: cols).map(new Column(_)) :_*)
    +  }
    +
    +  /**
    +   * Filtering rows using the given condition.
    +   * {{{
    +   *   // The following are equivalent:
    +   *   peopleDf.filter($"age" > 15)
    +   *   peopleDf.where($"age" > 15)
    +   *   peopleDf($"age > 15)
    +   * }}}
    +   */
    +  override def filter(condition: Column): DataFrame = {
    +    Filter(condition.expr, logicalPlan)
    +  }
    +
    +  /**
    +   * Filtering rows using the given condition. This is an alias for 
`filter`.
    +   * {{{
    +   *   // The following are equivalent:
    +   *   peopleDf.filter($"age" > 15)
    +   *   peopleDf.where($"age" > 15)
    +   *   peopleDf($"age > 15)
    +   * }}}
    +   */
    +  override def where(condition: Column): DataFrame = filter(condition)
    +
    +  /**
    +   * Filtering rows using the given condition. This is a shorthand meant 
for Scala.
    +   * {{{
    +   *   // The following are equivalent:
    +   *   peopleDf.filter($"age" > 15)
    +   *   peopleDf.where($"age" > 15)
    +   *   peopleDf($"age > 15)
    +   * }}}
    +   */
    +  override def apply(condition: Column): DataFrame = filter(condition)
    +
    +  /**
    +   * Group the [[DataFrame]] using the specified columns, so we can run 
aggregation on them.
    +   * See [[GroupedDataFrame]] for all the available aggregate functions.
    +   *
    +   * {{{
    +   *   // Compute the average for all numeric columns grouped by 
department.
    +   *   df.groupby($"department").avg()
    +   *
    +   *   // Compute the max age and average salary, grouped by department 
and gender.
    +   *   df.groupby($"department", $"gender").agg(Map(
    +   *     "salary" -> "avg",
    +   *     "age" -> "max"
    +   *   ))
    +   * }}}
    +   */
    +  @scala.annotation.varargs
    +  override def groupby(cols: Column*): GroupedDataFrame = {
    +    new GroupedDataFrame(this, cols.map(_.expr))
    +  }
    +
    +  /**
    +   * Group the [[DataFrame]] using the specified columns, so we can run 
aggregation on them.
    +   * See [[GroupedDataFrame]] for all the available aggregate functions.
    +   *
    +   * This is a variant of groupby that can only group by existing columns 
using column names
    +   * (i.e. cannot construct expressions).
    +   *
    +   * {{{
    +   *   // Compute the average for all numeric columns grouped by 
department.
    +   *   df.groupby("department").avg()
    +   *
    +   *   // Compute the max age and average salary, grouped by department 
and gender.
    +   *   df.groupby($"department", $"gender").agg(Map(
    +   *     "salary" -> "avg",
    +   *     "age" -> "max"
    +   *   ))
    +   * }}}
    +   */
    +  @scala.annotation.varargs
    +  override def groupby(col1: String, cols: String*): GroupedDataFrame = {
    +    val colNames: Seq[String] = col1 +: cols
    +    new GroupedDataFrame(this, colNames.map(colName => resolve(colName)))
    +  }
    +
    +  /**
    +   * Aggregate on the entire [[DataFrame]] without groups.
    +   * {{
    +   *   // df.agg(...) is a shorthand for df.groupby().agg(...)
    +   *   df.agg(Map("age" -> "max", "salary" -> "avg"))
    +   *   df.groupby().agg(Map("age" -> "max", "salary" -> "avg"))
    +   * }}
    +   */
    +  override def agg(exprs: Map[String, String]): DataFrame = 
groupby().agg(exprs)
    +
    +  /**
    +   * Aggregate on the entire [[DataFrame]] without groups.
    +   * {{
    +   *   // df.agg(...) is a shorthand for df.groupby().agg(...)
    +   *   df.agg(max($"age"), avg($"salary"))
    +   *   df.groupby().agg(max($"age"), avg($"salary"))
    +   * }}
    +   */
    +  @scala.annotation.varargs
    +  override def agg(expr: Column, exprs: Column*): DataFrame = 
groupby().agg(expr, exprs :_*)
    +
    +  /**
    +   * Return a new [[DataFrame]] by taking the first `n` rows. The 
difference between this function
    +   * and `head` is that `head` returns an array while `limit` returns a 
new [[DataFrame]].
    +   */
    +  override def limit(n: Int): DataFrame = Limit(LiteralExpr(n), 
logicalPlan)
    +
    +  /**
    +   * Return a new [[DataFrame]] containing union of rows in this frame and 
another frame.
    +   * This is equivalent to `UNION ALL` in SQL.
    +   */
    +  override def unionAll(other: DataFrame): DataFrame = Union(logicalPlan, 
other.logicalPlan)
    +
    +  /**
    +   * Return a new [[DataFrame]] containing rows only in both this frame 
and another frame.
    +   * This is equivalent to `INTERSECT` in SQL.
    +   */
    +  override def intersect(other: DataFrame): DataFrame = 
Intersect(logicalPlan, other.logicalPlan)
    +
    +  /**
    +   * Return a new [[DataFrame]] containing rows in this frame but not in 
another frame.
    +   * This is equivalent to `EXCEPT` in SQL.
    +   */
    +  override def except(other: DataFrame): DataFrame = Except(logicalPlan, 
other.logicalPlan)
    +
    +  /**
    +   * Return a new [[DataFrame]] by sampling a fraction of rows.
    +   *
    +   * @param withReplacement Sample with replacement or not.
    +   * @param fraction Fraction of rows to generate.
    +   * @param seed Seed for sampling.
    +   */
    +  override def sample(withReplacement: Boolean, fraction: Double, seed: 
Long): DataFrame = {
    +    Sample(fraction, withReplacement, seed, logicalPlan)
    +  }
    +
    +  /**
    +   * Return a new [[DataFrame]] by sampling a fraction of rows, using a 
random seed.
    +   *
    +   * @param withReplacement Sample with replacement or not.
    +   * @param fraction Fraction of rows to generate.
    +   */
    +  override def sample(withReplacement: Boolean, fraction: Double): 
DataFrame = {
    +    sample(withReplacement, fraction, Utils.random.nextLong)
    +  }
    +
    +  
/////////////////////////////////////////////////////////////////////////////
    +
    +  /**
    +   * Return a new [[DataFrame]] by adding a column.
    +   */
    +  override def addColumn(colName: String, col: Column): DataFrame = {
    +    select(Column("*"), col.as(colName))
    +  }
    +
    +  override def removeColumn(colName: String, col: Column): DataFrame = ???
    --- End diff --
    
    probalby should be cols


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to