Repository: spark
Updated Branches:
  refs/heads/master 3e831a269 -> 74335b310


[SPARK-5707] [SQL] fix serialization of generated projection

Author: Davies Liu <dav...@databricks.com>

Closes #7272 from davies/fix_projection and squashes the following commits:

075ef76 [Davies Liu] fix codegen with BroadcastHashJion


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74335b31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74335b31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74335b31

Branch: refs/heads/master
Commit: 74335b31072951244967f878d8b766cd1bfc2ac6
Parents: 3e831a2
Author: Davies Liu <dav...@databricks.com>
Authored: Wed Jul 8 10:43:00 2015 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Jul 8 10:43:00 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala | 3 +--
 .../org/apache/spark/sql/execution/joins/HashOuterJoin.scala      | 2 +-
 .../org/apache/spark/sql/execution/joins/HashedRelation.scala     | 2 +-
 3 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index 06c244f..ab757fc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -79,8 +79,7 @@ case class BroadcastHashOuterJoin(
     // Note that we use .execute().collect() because we don't want to convert 
data to Scala types
     val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
     // buildHashTable uses code-generated rows as keys, which are not 
serializable
-    val hashed =
-      buildHashTable(input.iterator, new InterpretedProjection(buildKeys, 
buildPlan.output))
+    val hashed = buildHashTable(input.iterator, newProjection(buildKeys, 
buildPlan.output))
     sparkContext.broadcast(hashed)
   }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 3337451..0522ee8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -171,7 +171,7 @@ override def outputPartitioning: Partitioning = joinType 
match {
       var existingMatchList = hashTable.get(rowKey)
       if (existingMatchList == null) {
         existingMatchList = new CompactBuffer[InternalRow]()
-        hashTable.put(rowKey, existingMatchList)
+        hashTable.put(rowKey.copy(), existingMatchList)
       }
 
       existingMatchList += currentRow.copy()

http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index de062c7..6b51f5d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -125,7 +125,7 @@ private[joins] object HashedRelation {
         val existingMatchList = hashTable.get(rowKey)
         val matchList = if (existingMatchList == null) {
           val newMatchList = new CompactBuffer[InternalRow]()
-          hashTable.put(rowKey, newMatchList)
+          hashTable.put(rowKey.copy(), newMatchList)
           newMatchList
         } else {
           keyIsUnique = false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to