Repository: spark Updated Branches: refs/heads/branch-1.5 70bf170b9 -> 1b0f784a1
[SPARK-9667][SQL] followup: Use GenerateUnsafeProjection.canSupport to test Exchange supported data types. This way we recursively test the data types. cc chenghao-intel Author: Reynold Xin <r...@databricks.com> Closes #8036 from rxin/cansupport and squashes the following commits: f7302ff [Reynold Xin] Can GenerateUnsafeProjection.canSupport to test Exchange supported data types. (cherry picked from commit aeddeafc03d77a5149d2c8f9489b0ca83e6b3e03) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b0f784a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b0f784a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b0f784a Branch: refs/heads/branch-1.5 Commit: 1b0f784a1fb7cfa30ec29f943b0648e4cbb7cf5d Parents: 70bf170 Author: Reynold Xin <r...@databricks.com> Authored: Fri Aug 7 13:26:03 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Fri Aug 7 13:28:08 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/execution/Exchange.scala | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1b0f784a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 60087f2..49bb729 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -27,9 +27,9 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.types.UserDefinedType import org.apache.spark.util.MutablePair import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv} @@ -43,18 +43,11 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una override def nodeName: String = if (tungstenMode) "TungstenExchange" else "Exchange" /** - * Returns true iff the children outputs aggregate UDTs that are not part of the SQL type. - * This only happens with the old aggregate implementation and should be removed in 1.6. + * Returns true iff we can support the data type, and we are not doing range partitioning. */ private lazy val tungstenMode: Boolean = { - val unserializableUDT = child.schema.exists(_.dataType match { - case _: UserDefinedType[_] => true - case _ => false - }) - // Do not use the Unsafe path if we are using a RangePartitioning, since this may lead to - // an interpreted RowOrdering being applied to an UnsafeRow, which will lead to - // ClassCastExceptions at runtime. This check can be removed after SPARK-9054 is fixed. - !unserializableUDT && !newPartitioning.isInstanceOf[RangePartitioning] + GenerateUnsafeProjection.canSupport(child.schema) && + !newPartitioning.isInstanceOf[RangePartitioning] } override def outputPartitioning: Partitioning = newPartitioning --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org