Repository: spark
Updated Branches:
  refs/heads/branch-1.5 70bf170b9 -> 1b0f784a1


[SPARK-9667][SQL] followup: Use GenerateUnsafeProjection.canSupport to test 
Exchange supported data types.

This way we recursively test the data types.

cc chenghao-intel

Author: Reynold Xin <r...@databricks.com>

Closes #8036 from rxin/cansupport and squashes the following commits:

f7302ff [Reynold Xin] Can GenerateUnsafeProjection.canSupport to test Exchange 
supported data types.

(cherry picked from commit aeddeafc03d77a5149d2c8f9489b0ca83e6b3e03)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b0f784a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b0f784a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b0f784a

Branch: refs/heads/branch-1.5
Commit: 1b0f784a1fb7cfa30ec29f943b0648e4cbb7cf5d
Parents: 70bf170
Author: Reynold Xin <r...@databricks.com>
Authored: Fri Aug 7 13:26:03 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Aug 7 13:28:08 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/Exchange.scala    | 15 ++++-----------
 1 file changed, 4 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b0f784a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 60087f2..49bb729 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -27,9 +27,9 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.attachTree
 import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.types.UserDefinedType
 import org.apache.spark.util.MutablePair
 import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, 
SparkEnv}
 
@@ -43,18 +43,11 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
   override def nodeName: String = if (tungstenMode) "TungstenExchange" else 
"Exchange"
 
   /**
-   * Returns true iff the children outputs aggregate UDTs that are not part of 
the SQL type.
-   * This only happens with the old aggregate implementation and should be 
removed in 1.6.
+   * Returns true iff we can support the data type, and we are not doing range 
partitioning.
    */
   private lazy val tungstenMode: Boolean = {
-    val unserializableUDT = child.schema.exists(_.dataType match {
-      case _: UserDefinedType[_] => true
-      case _ => false
-    })
-    // Do not use the Unsafe path if we are using a RangePartitioning, since 
this may lead to
-    // an interpreted RowOrdering being applied to an UnsafeRow, which will 
lead to
-    // ClassCastExceptions at runtime. This check can be removed after 
SPARK-9054 is fixed.
-    !unserializableUDT && !newPartitioning.isInstanceOf[RangePartitioning]
+    GenerateUnsafeProjection.canSupport(child.schema) &&
+      !newPartitioning.isInstanceOf[RangePartitioning]
   }
 
   override def outputPartitioning: Partitioning = newPartitioning


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to