Repository: spark Updated Branches: refs/heads/branch-1.2 e35672e7e -> 680bc0619
[SQL] Don't shuffle code generated rows When sort based shuffle and code gen are on we were trying to ship the code generated rows during a shuffle. This doesn't work because the classes don't exist on the other side. Instead we now copy into a generic row before shipping. Author: Michael Armbrust <mich...@databricks.com> Closes #3263 from marmbrus/aggCodeGen and squashes the following commits: f6ba8cf [Michael Armbrust] fix and test (cherry picked from commit 4b4b50c9e596673c1534df97effad50d107a8007) Signed-off-by: Michael Armbrust <mich...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/680bc061 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/680bc061 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/680bc061 Branch: refs/heads/branch-1.2 Commit: 680bc06195ecdc6ff2390c55adeb637649f2c8f3 Parents: e35672e Author: Michael Armbrust <mich...@databricks.com> Authored: Fri Nov 14 15:03:23 2014 -0800 Committer: Michael Armbrust <mich...@databricks.com> Committed: Fri Nov 14 15:03:45 2014 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/sql/execution/Exchange.scala | 4 ++-- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/680bc061/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 927f400..cff7a01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -47,8 +47,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una // TODO: Eliminate redundant expressions in grouping key and value. val rdd = if (sortBasedShuffleOn) { child.execute().mapPartitions { iter => - val hashExpressions = newProjection(expressions, child.output) - iter.map(r => (hashExpressions(r), r.copy())) + val hashExpressions = newMutableProjection(expressions, child.output)() + iter.map(r => (hashExpressions(r).copy(), r.copy())) } } else { child.execute().mapPartitions { iter => http://git-wip-us.apache.org/repos/asf/spark/blob/680bc061/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8a80724..5dd777f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -72,6 +72,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { 2.5) } + test("aggregation with codegen") { + val originalValue = codegenEnabled + setConf(SQLConf.CODEGEN_ENABLED, "true") + sql("SELECT key FROM testData GROUP BY key").collect() + setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString) + } + test("SPARK-3176 Added Parser of SQL LAST()") { checkAnswer( sql("SELECT LAST(n) FROM lowerCaseData"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org