Repository: spark
Updated Branches:
  refs/heads/master c76865c62 -> 6b87acd66


[SPARK-11513][SQL] Remove implicit conversion from LogicalPlan to DataFrame

This internal implicit conversion has been a source of confusion for a lot of 
new developers.

Author: Reynold Xin <r...@databricks.com>

Closes #9479 from rxin/SPARK-11513.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b87acd6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b87acd6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b87acd6

Branch: refs/heads/master
Commit: 6b87acd6649a3390b5c2c4fcb61e58d125d0d87c
Parents: c76865c
Author: Reynold Xin <r...@databricks.com>
Authored: Thu Nov 5 11:58:13 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Nov 5 11:58:13 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  | 123 +++++++++++--------
 .../scala/org/apache/spark/sql/Dataset.scala    |   5 +-
 2 files changed, 78 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6b87acd6/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d3a2249..6336dee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -147,14 +147,6 @@ class DataFrame private[sql](
       queryExecution.analyzed
   }
 
-  /**
-   * An implicit conversion function internal to this class for us to avoid 
doing
-   * "new DataFrame(...)" everywhere.
-   */
-  @inline private implicit def logicalPlanToDataFrame(logicalPlan: 
LogicalPlan): DataFrame = {
-    new DataFrame(sqlContext, logicalPlan)
-  }
-
   protected[sql] def resolve(colName: String): NamedExpression = {
     queryExecution.analyzed.resolveQuoted(colName, 
sqlContext.analyzer.resolver).getOrElse {
       throw new AnalysisException(
@@ -235,7 +227,7 @@ class DataFrame private[sql](
     // For Data that has more than "numRows" records
     if (hasMoreData) {
       val rowsString = if (numRows == 1) "row" else "rows"
-      sb.append(s"only showing top $numRows ${rowsString}\n")
+      sb.append(s"only showing top $numRows $rowsString\n")
     }
 
     sb.toString()
@@ -332,7 +324,7 @@ class DataFrame private[sql](
    */
   def explain(extended: Boolean): Unit = {
     val explain = ExplainCommand(queryExecution.logical, extended = extended)
-    explain.queryExecution.executedPlan.executeCollect().foreach {
+    withPlan(explain).queryExecution.executedPlan.executeCollect().foreach {
       // scalastyle:off println
       r => println(r.getString(0))
       // scalastyle:on println
@@ -370,7 +362,7 @@ class DataFrame private[sql](
    * @group action
    * @since 1.3.0
    */
-  def show(numRows: Int): Unit = show(numRows, true)
+  def show(numRows: Int): Unit = show(numRows, truncate = true)
 
   /**
    * Displays the top 20 rows of [[DataFrame]] in a tabular form. Strings more 
than 20 characters
@@ -445,7 +437,7 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.3.0
    */
-  def join(right: DataFrame): DataFrame = {
+  def join(right: DataFrame): DataFrame = withPlan {
     Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
   }
 
@@ -520,21 +512,25 @@ class DataFrame private[sql](
       Join(logicalPlan, right.logicalPlan, joinType = Inner, 
None)).analyzed.asInstanceOf[Join]
 
     // Project only one of the join columns.
-    val joinedCols = usingColumns.map(col => joined.right.resolve(col))
+    val joinedCols = usingColumns.map(col => 
withPlan(joined.right).resolve(col))
     val condition = usingColumns.map { col =>
-      catalyst.expressions.EqualTo(joined.left.resolve(col), 
joined.right.resolve(col))
+      catalyst.expressions.EqualTo(
+        withPlan(joined.left).resolve(col),
+        withPlan(joined.right).resolve(col))
     }.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) =>
       catalyst.expressions.And(cond, eqTo)
     }
 
-    Project(
-      joined.output.filterNot(joinedCols.contains(_)),
-      Join(
-        joined.left,
-        joined.right,
-        joinType = JoinType(joinType),
-        condition)
-    )
+    withPlan {
+      Project(
+        joined.output.filterNot(joinedCols.contains(_)),
+        Join(
+          joined.left,
+          joined.right,
+          joinType = JoinType(joinType),
+          condition)
+      )
+    }
   }
 
   /**
@@ -581,19 +577,20 @@ class DataFrame private[sql](
 
     // Trigger analysis so in the case of self-join, the analyzer will clone 
the plan.
     // After the cloning, left and right side will have distinct expression 
ids.
-    val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), 
Some(joinExprs.expr))
+    val plan = withPlan(
+      Join(logicalPlan, right.logicalPlan, JoinType(joinType), 
Some(joinExprs.expr)))
       .queryExecution.analyzed.asInstanceOf[Join]
 
     // If auto self join alias is disabled, return the plan.
     if (!sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity) {
-      return plan
+      return withPlan(plan)
     }
 
     // If left/right have no output set intersection, return the plan.
-    val lanalyzed = this.logicalPlan.queryExecution.analyzed
-    val ranalyzed = right.logicalPlan.queryExecution.analyzed
+    val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed
+    val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed
     if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
-      return plan
+      return withPlan(plan)
     }
 
     // Otherwise, find the trivially true predicates and automatically 
resolves them to both sides.
@@ -602,9 +599,14 @@ class DataFrame private[sql](
     val cond = plan.condition.map { _.transform {
       case catalyst.expressions.EqualTo(a: AttributeReference, b: 
AttributeReference)
           if a.sameRef(b) =>
-        catalyst.expressions.EqualTo(plan.left.resolve(a.name), 
plan.right.resolve(b.name))
+        catalyst.expressions.EqualTo(
+          withPlan(plan.left).resolve(a.name),
+          withPlan(plan.right).resolve(b.name))
     }}
-    plan.copy(condition = cond)
+
+    withPlan {
+      plan.copy(condition = cond)
+    }
   }
 
   /**
@@ -707,7 +709,9 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.3.0
    */
-  def as(alias: String): DataFrame = Subquery(alias, logicalPlan)
+  def as(alias: String): DataFrame = withPlan {
+    Subquery(alias, logicalPlan)
+  }
 
   /**
    * (Scala-specific) Returns a new [[DataFrame]] with an alias set.
@@ -739,7 +743,7 @@ class DataFrame private[sql](
    * @since 1.3.0
    */
   @scala.annotation.varargs
-  def select(cols: Column*): DataFrame = {
+  def select(cols: Column*): DataFrame = withPlan {
     val namedExpressions = cols.map {
       // Wrap UnresolvedAttribute with UnresolvedAlias, as when we resolve 
UnresolvedAttribute, we
       // will remove intermediate Alias for ExtractValue chain, and we need to 
alias it again to
@@ -798,7 +802,9 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.3.0
    */
-  def filter(condition: Column): DataFrame = Filter(condition.expr, 
logicalPlan)
+  def filter(condition: Column): DataFrame = withPlan {
+    Filter(condition.expr, logicalPlan)
+  }
 
   /**
    * Filters rows using the given SQL expression.
@@ -1039,7 +1045,9 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.3.0
    */
-  def limit(n: Int): DataFrame = Limit(Literal(n), logicalPlan)
+  def limit(n: Int): DataFrame = withPlan {
+    Limit(Literal(n), logicalPlan)
+  }
 
   /**
    * Returns a new [[DataFrame]] containing union of rows in this frame and 
another frame.
@@ -1047,7 +1055,9 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.3.0
    */
-  def unionAll(other: DataFrame): DataFrame = Union(logicalPlan, 
other.logicalPlan)
+  def unionAll(other: DataFrame): DataFrame = withPlan {
+    Union(logicalPlan, other.logicalPlan)
+  }
 
   /**
    * Returns a new [[DataFrame]] containing rows only in both this frame and 
another frame.
@@ -1055,7 +1065,9 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.3.0
    */
-  def intersect(other: DataFrame): DataFrame = Intersect(logicalPlan, 
other.logicalPlan)
+  def intersect(other: DataFrame): DataFrame = withPlan {
+    Intersect(logicalPlan, other.logicalPlan)
+  }
 
   /**
    * Returns a new [[DataFrame]] containing rows in this frame but not in 
another frame.
@@ -1063,7 +1075,9 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.3.0
    */
-  def except(other: DataFrame): DataFrame = Except(logicalPlan, 
other.logicalPlan)
+  def except(other: DataFrame): DataFrame = withPlan {
+    Except(logicalPlan, other.logicalPlan)
+  }
 
   /**
    * Returns a new [[DataFrame]] by sampling a fraction of rows.
@@ -1074,7 +1088,7 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.3.0
    */
-  def sample(withReplacement: Boolean, fraction: Double, seed: Long): 
DataFrame = {
+  def sample(withReplacement: Boolean, fraction: Double, seed: Long): 
DataFrame = withPlan {
     Sample(0.0, fraction, withReplacement, seed, logicalPlan)
   }
 
@@ -1102,7 +1116,7 @@ class DataFrame private[sql](
     val sum = weights.sum
     val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
     normalizedCumWeights.sliding(2).map { x =>
-      new DataFrame(sqlContext, Sample(x(0), x(1), false, seed, logicalPlan))
+      new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, 
seed, logicalPlan))
     }.toArray
   }
 
@@ -1162,8 +1176,10 @@ class DataFrame private[sql](
       f.andThen(_.map(convert(_).asInstanceOf[InternalRow]))
     val generator = UserDefinedGenerator(elementTypes, rowFunction, 
input.map(_.expr))
 
-    Generate(generator, join = true, outer = false,
-      qualifier = None, generatorOutput = Nil, logicalPlan)
+    withPlan {
+      Generate(generator, join = true, outer = false,
+        qualifier = None, generatorOutput = Nil, logicalPlan)
+    }
   }
 
   /**
@@ -1190,8 +1206,10 @@ class DataFrame private[sql](
     }
     val generator = UserDefinedGenerator(elementTypes, rowFunction, 
apply(inputColumn).expr :: Nil)
 
-    Generate(generator, join = true, outer = false,
-      qualifier = None, generatorOutput = Nil, logicalPlan)
+    withPlan {
+      Generate(generator, join = true, outer = false,
+        qualifier = None, generatorOutput = Nil, logicalPlan)
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -1309,7 +1327,7 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.4.0
    */
-  def dropDuplicates(colNames: Seq[String]): DataFrame = {
+  def dropDuplicates(colNames: Seq[String]): DataFrame = withPlan {
     val groupCols = colNames.map(resolve)
     val groupColExprIds = groupCols.map(_.exprId)
     val aggCols = logicalPlan.output.map { attr =>
@@ -1355,7 +1373,7 @@ class DataFrame private[sql](
    * @since 1.3.1
    */
   @scala.annotation.varargs
-  def describe(cols: String*): DataFrame = {
+  def describe(cols: String*): DataFrame = withPlan {
 
     // The list of summary statistics to compute, in the form of expressions.
     val statistics = List[(String, Expression => Expression)](
@@ -1505,7 +1523,7 @@ class DataFrame private[sql](
    * @group dfops
    * @since 1.3.0
    */
-  def repartition(numPartitions: Int): DataFrame = {
+  def repartition(numPartitions: Int): DataFrame = withPlan {
     Repartition(numPartitions, shuffle = true, logicalPlan)
   }
 
@@ -1519,7 +1537,7 @@ class DataFrame private[sql](
    * @since 1.6.0
    */
   @scala.annotation.varargs
-  def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = {
+  def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = 
withPlan {
     RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, 
Some(numPartitions))
   }
 
@@ -1533,7 +1551,7 @@ class DataFrame private[sql](
    * @since 1.6.0
    */
   @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): DataFrame = {
+  def repartition(partitionExprs: Column*): DataFrame = withPlan {
     RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, 
numPartitions = None)
   }
 
@@ -1545,7 +1563,7 @@ class DataFrame private[sql](
    * @group rdd
    * @since 1.4.0
    */
-  def coalesce(numPartitions: Int): DataFrame = {
+  def coalesce(numPartitions: Int): DataFrame = withPlan {
     Repartition(numPartitions, shuffle = false, logicalPlan)
   }
 
@@ -2066,7 +2084,14 @@ class DataFrame private[sql](
           SortOrder(expr, Ascending)
       }
     }
-    Sort(sortOrder, global = global, logicalPlan)
+    withPlan {
+      Sort(sortOrder, global = global, logicalPlan)
+    }
+  }
+
+  /** A convenient function to wrap a logical plan and produce a DataFrame. */
+  @inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
+    new DataFrame(sqlContext, logicalPlan)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6b87acd6/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 7b75aee..500227e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -107,13 +107,16 @@ class Dataset[T] private(
    * strongly typed objects that Dataset operations work on, a Dataframe 
returns generic [[Row]]
    * objects that allow fields to be accessed by ordinal or name.
    */
+  // This is declared with parentheses to prevent the Scala compiler from 
treating
+  // `ds.toDF("1")` as invoking this toDF and then apply on the returned 
DataFrame.
   def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
 
-
   /**
    * Returns this Dataset.
    * @since 1.6.0
    */
+  // This is declared with parentheses to prevent the Scala compiler from 
treating
+  // `ds.toDS("1")` as invoking this toDF and then apply on the returned 
Dataset.
   def toDS(): Dataset[T] = this
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to