Repository: spark Updated Branches: refs/heads/master fc3a6fe67 -> 0eee06158
[SQL] [MINOR] remove internalRowRDD in DataFrame Developers have already familiar with `queryExecution.toRDD` as internal row RDD, and we should not add new concept. Author: Wenchen Fan <cloud0...@outlook.com> Closes #7116 from cloud-fan/internal-rdd and squashes the following commits: 24756ca [Wenchen Fan] remove internalRowRDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0eee0615 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0eee0615 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0eee0615 Branch: refs/heads/master Commit: 0eee0615894cda8ae1b2c8e61b8bda0ff648a219 Parents: fc3a6fe Author: Wenchen Fan <cloud0...@outlook.com> Authored: Wed Jul 1 01:02:33 2015 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed Jul 1 01:02:33 2015 -0700 ---------------------------------------------------------------------- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 4 +--- .../org/apache/spark/sql/execution/stat/FrequentItems.scala | 2 +- .../org/apache/spark/sql/execution/stat/StatFunctions.scala | 2 +- .../src/main/scala/org/apache/spark/sql/sources/commands.scala | 4 ++-- 4 files changed, 5 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 8fe1f7e..caad2da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1469,14 +1469,12 @@ class DataFrame private[sql]( lazy val rdd: RDD[Row] = { // use a local variable to make sure the map closure doesn't capture the whole DataFrame val schema = this.schema - internalRowRdd.mapPartitions { rows => + queryExecution.toRdd.mapPartitions { rows => val converter = CatalystTypeConverters.createToScalaConverter(schema) rows.map(converter(_).asInstanceOf[Row]) } } - private[sql] def internalRowRdd = queryExecution.executedPlan.execute() - /** * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s. * @group rdd http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala index 3ebbf96..4e2e2c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala @@ -90,7 +90,7 @@ private[sql] object FrequentItems extends Logging { (name, originalSchema.fields(index).dataType) } - val freqItems = df.select(cols.map(Column(_)) : _*).internalRowRdd.aggregate(countMaps)( + val freqItems = df.select(cols.map(Column(_)) : _*).queryExecution.toRdd.aggregate(countMaps)( seqOp = (counts, row) => { var i = 0 while (i < numCols) { http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index b624ef7..23ddfa9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -82,7 +82,7 @@ private[sql] object StatFunctions extends Logging { s"with dataType ${data.get.dataType} not supported.") } val columns = cols.map(n => Column(Cast(Column(n).expr, DoubleType))) - df.select(columns: _*).internalRowRdd.aggregate(new CovarianceCounter)( + df.select(columns: _*).queryExecution.toRdd.aggregate(new CovarianceCounter)( seqOp = (counter, row) => { counter.add(row.getDouble(0), row.getDouble(1)) }, http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 42b51ca..7214eb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -154,7 +154,7 @@ private[sql] case class InsertIntoHadoopFsRelation( writerContainer.driverSideSetup() try { - df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _) + df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _) writerContainer.commitJob() relation.refresh() } catch { case cause: Throwable => @@ -220,7 +220,7 @@ private[sql] case class InsertIntoHadoopFsRelation( writerContainer.driverSideSetup() try { - df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _) + df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _) writerContainer.commitJob() relation.refresh() } catch { case cause: Throwable => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org