spark git commit: [SPARK-8868] SqlSerializer2 can go into infinite loop when row consists only of NullType columns

2015-07-07 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 4ca90935c -> 68a4a1697


[SPARK-8868] SqlSerializer2 can go into infinite loop when row consists only of 
NullType columns

https://issues.apache.org/jira/browse/SPARK-8868

Author: Yin Huai 

Closes #7262 from yhuai/SPARK-8868 and squashes the following commits:

cb58780 [Yin Huai] Andrew's comment.
e456857 [Yin Huai] Josh's comments.
5122e65 [Yin Huai] If types of all columns are NullTypes, do not use 
serializer2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68a4a169
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68a4a169
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68a4a169

Branch: refs/heads/master
Commit: 68a4a169714e11d8c537ad9431ae9974f6b7e8d3
Parents: 4ca9093
Author: Yin Huai 
Authored: Tue Jul 7 18:36:35 2015 -0700
Committer: Josh Rosen 
Committed: Tue Jul 7 18:36:35 2015 -0700

--
 .../sql/execution/SparkSqlSerializer2.scala | 25 
 .../execution/SparkSqlSerializer2Suite.scala| 20 +++-
 2 files changed, 39 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/68a4a169/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 056d435..6ed822d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -179,23 +179,38 @@ private[sql] object SparkSqlSerializer2 {
 
   /**
* Check if rows with the given schema can be serialized with 
ShuffleSerializer.
+   * Right now, we do not support a schema having complex types or UDTs, or 
all data types
+   * of fields are NullTypes.
*/
   def support(schema: Array[DataType]): Boolean = {
 if (schema == null) return true
 
+var allNullTypes = true
 var i = 0
 while (i < schema.length) {
   schema(i) match {
-case udt: UserDefinedType[_] => return false
-case array: ArrayType => return false
-case map: MapType => return false
-case struct: StructType => return false
+case NullType => // Do nothing
+case udt: UserDefinedType[_] =>
+  allNullTypes = false
+  return false
+case array: ArrayType =>
+  allNullTypes = false
+  return false
+case map: MapType =>
+  allNullTypes = false
+  return false
+case struct: StructType =>
+  allNullTypes = false
+  return false
 case _ =>
+  allNullTypes = false
   }
   i += 1
 }
 
-return true
+// If types of fields are all NullTypes, we return false.
+// Otherwise, we return true.
+return !allNullTypes
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/68a4a169/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
index 8631e24..71f6b26 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
@@ -42,7 +42,6 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite {
   }
 
   checkSupported(null, isSupported = true)
-  checkSupported(NullType, isSupported = true)
   checkSupported(BooleanType, isSupported = true)
   checkSupported(ByteType, isSupported = true)
   checkSupported(ShortType, isSupported = true)
@@ -57,6 +56,8 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite {
   checkSupported(DecimalType(10, 5), isSupported = true)
   checkSupported(DecimalType.Unlimited, isSupported = true)
 
+  // If NullType is the only data type in the schema, we do not support it.
+  checkSupported(NullType, isSupported = false)
   // For now, ArrayType, MapType, and StructType are not supported.
   checkSupported(ArrayType(DoubleType, true), isSupported = false)
   checkSupported(ArrayType(StringType, false), isSupported = false)
@@ -170,6 +171,23 @@ abstract class SparkSqlSerializer2Suite extends QueryTest 
with BeforeAndAfterAll
 val df = ctx.sql(s"SELECT 1 + 1 FROM shuffle")
 checkSerializer(df.queryExecution.executedPlan, 
classOf[SparkSqlSerializer])
   }
+
+  test("types of fields are all NullTypes") {
+// Test

spark git commit: [SPARK-8868] SqlSerializer2 can go into infinite loop when row consists only of NullType columns

2015-07-07 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 83a621a5a -> d3d5f2ab2


[SPARK-8868] SqlSerializer2 can go into infinite loop when row consists only of 
NullType columns

https://issues.apache.org/jira/browse/SPARK-8868

Author: Yin Huai 

Closes #7262 from yhuai/SPARK-8868 and squashes the following commits:

cb58780 [Yin Huai] Andrew's comment.
e456857 [Yin Huai] Josh's comments.
5122e65 [Yin Huai] If types of all columns are NullTypes, do not use 
serializer2.

(cherry picked from commit 68a4a169714e11d8c537ad9431ae9974f6b7e8d3)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3d5f2ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3d5f2ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3d5f2ab

Branch: refs/heads/branch-1.4
Commit: d3d5f2ab25acaf261753738e63ae30a34d28c291
Parents: 83a621a
Author: Yin Huai 
Authored: Tue Jul 7 18:36:35 2015 -0700
Committer: Josh Rosen 
Committed: Tue Jul 7 18:48:12 2015 -0700

--
 .../sql/execution/SparkSqlSerializer2.scala | 25 
 .../execution/SparkSqlSerializer2Suite.scala| 20 +++-
 2 files changed, 39 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d3d5f2ab/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 256d527..65c0e52 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -186,23 +186,38 @@ private[sql] object SparkSqlSerializer2 {
 
   /**
* Check if rows with the given schema can be serialized with 
ShuffleSerializer.
+   * Right now, we do not support a schema having complex types or UDTs, or 
all data types
+   * of fields are NullTypes.
*/
   def support(schema: Array[DataType]): Boolean = {
 if (schema == null) return true
 
+var allNullTypes = true
 var i = 0
 while (i < schema.length) {
   schema(i) match {
-case udt: UserDefinedType[_] => return false
-case array: ArrayType => return false
-case map: MapType => return false
-case struct: StructType => return false
+case NullType => // Do nothing
+case udt: UserDefinedType[_] =>
+  allNullTypes = false
+  return false
+case array: ArrayType =>
+  allNullTypes = false
+  return false
+case map: MapType =>
+  allNullTypes = false
+  return false
+case struct: StructType =>
+  allNullTypes = false
+  return false
 case _ =>
+  allNullTypes = false
   }
   i += 1
 }
 
-return true
+// If types of fields are all NullTypes, we return false.
+// Otherwise, we return true.
+return !allNullTypes
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d3d5f2ab/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
index 6ca5390..05af102 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
@@ -42,7 +42,6 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite {
   }
 
   checkSupported(null, isSupported = true)
-  checkSupported(NullType, isSupported = true)
   checkSupported(BooleanType, isSupported = true)
   checkSupported(ByteType, isSupported = true)
   checkSupported(ShortType, isSupported = true)
@@ -57,6 +56,8 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite {
   checkSupported(DecimalType(10, 5), isSupported = true)
   checkSupported(DecimalType.Unlimited, isSupported = true)
 
+  // If NullType is the only data type in the schema, we do not support it.
+  checkSupported(NullType, isSupported = false)
   // For now, ArrayType, MapType, and StructType are not supported.
   checkSupported(ArrayType(DoubleType, true), isSupported = false)
   checkSupported(ArrayType(StringType, false), isSupported = false)
@@ -169,6 +170,23 @@ abstract class SparkSqlSerializer2Suite extends QueryTest 
with BeforeAndAfterAll
 val df = sql(s"SELECT 1 + 1 FROM shuffle")
 checkSerializer(df.queryExecution.executedP