Repository: spark
Updated Branches:
  refs/heads/master ade72c436 -> bbd8f5bee


[SPARK-4245][SQL] Fix containsNull of the result ArrayType of CreateArray 
expression.

The `containsNull` of the result `ArrayType` of `CreateArray` should be `true` 
only if the children is empty or there exists nullable child.

Author: Takuya UESHIN <ues...@happy-camper.st>

Closes #3110 from ueshin/issues/SPARK-4245 and squashes the following commits:

6f64746 [Takuya UESHIN] Move equalsIgnoreNullability method into DataType.
5a90e02 [Takuya UESHIN] Refine InsertIntoHiveType and add some comments.
cbecba8 [Takuya UESHIN] Fix a test title.
884ec37 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-4245
3c5274b [Takuya UESHIN] Add tests to insert data of types ArrayType / MapType / 
StructType with nullability is false into Hive table.
41a94a9 [Takuya UESHIN] Replace InsertIntoTable with InsertIntoHiveTable if 
data types ignoring nullability are same.
43e6ef5 [Takuya UESHIN] Fix containsNull for empty array.
778e997 [Takuya UESHIN] Fix containsNull of the result ArrayType of CreateArray 
expression.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbd8f5be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbd8f5be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbd8f5be

Branch: refs/heads/master
Commit: bbd8f5bee81d5788c356977c173dd1edc42c77a3
Parents: ade72c4
Author: Takuya UESHIN <ues...@happy-camper.st>
Authored: Fri Nov 14 14:21:16 2014 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Fri Nov 14 14:21:16 2014 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/complexTypes.scala |  4 +-
 .../spark/sql/catalyst/types/dataTypes.scala    | 21 ++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 27 +++++++++++
 .../apache/spark/sql/hive/HiveStrategies.scala  |  6 ++-
 .../sql/hive/InsertIntoHiveTableSuite.scala     | 50 ++++++++++++++++++++
 5 files changed, 106 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bbd8f5be/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
index 19421e5..917b346 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
@@ -115,7 +115,9 @@ case class CreateArray(children: Seq[Expression]) extends 
Expression {
 
   override def dataType: DataType = {
     assert(resolved, s"Invalid dataType of mixed ArrayType 
${childTypes.mkString(",")}")
-    ArrayType(childTypes.headOption.getOrElse(NullType))
+    ArrayType(
+      childTypes.headOption.getOrElse(NullType),
+      containsNull = children.exists(_.nullable))
   }
 
   override def nullable: Boolean = false

http://git-wip-us.apache.org/repos/asf/spark/blob/bbd8f5be/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index 5dd19dd..ff1dc03 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -171,6 +171,27 @@ object DataType {
       case _ =>
     }
   }
+
+  /**
+   * Compares two types, ignoring nullability of ArrayType, MapType, 
StructType.
+   */
+  def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
+    (left, right) match {
+      case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) =>
+        equalsIgnoreNullability(leftElementType, rightElementType)
+      case (MapType(leftKeyType, leftValueType, _), MapType(rightKeyType, 
rightValueType, _)) =>
+        equalsIgnoreNullability(leftKeyType, rightKeyType) &&
+        equalsIgnoreNullability(leftValueType, rightValueType)
+      case (StructType(leftFields), StructType(rightFields)) =>
+        leftFields.size == rightFields.size &&
+        leftFields.zip(rightFields)
+          .forall{
+            case (left, right) =>
+              left.name == right.name && 
equalsIgnoreNullability(left.dataType, right.dataType)
+          }
+      case (left, right) => left == right
+    }
+  }
 }
 
 abstract class DataType {

http://git-wip-us.apache.org/repos/asf/spark/blob/bbd8f5be/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d446650..9045fc8 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -286,6 +286,12 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
 
       if (childOutputDataTypes == tableOutputDataTypes) {
         p
+      } else if (childOutputDataTypes.size == tableOutputDataTypes.size &&
+        childOutputDataTypes.zip(tableOutputDataTypes)
+          .forall { case (left, right) => 
DataType.equalsIgnoreNullability(left, right) }) {
+        // If both types ignoring nullability of ArrayType, MapType, 
StructType are the same,
+        // use InsertIntoHiveTable instead of InsertIntoTable.
+        InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite)
       } else {
         // Only do the casting when child output data types differ from table 
output data types.
         val castedChildOutput = child.output.zip(table.output).map {
@@ -317,6 +323,27 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
 }
 
 /**
+ * A logical plan representing insertion into Hive table.
+ * This plan ignores nullability of ArrayType, MapType, StructType unlike 
InsertIntoTable
+ * because Hive table doesn't have nullability for ARRAY, MAP, STRUCT types.
+ */
+private[hive] case class InsertIntoHiveTable(
+    table: LogicalPlan,
+    partition: Map[String, Option[String]],
+    child: LogicalPlan,
+    overwrite: Boolean)
+  extends LogicalPlan {
+
+  override def children = child :: Nil
+  override def output = child.output
+
+  override lazy val resolved = childrenResolved && 
child.output.zip(table.output).forall {
+    case (childAttr, tableAttr) =>
+      DataType.equalsIgnoreNullability(childAttr.dataType, tableAttr.dataType)
+  }
+}
+
+/**
  * :: DeveloperApi ::
  * Provides conversions between Spark SQL data types and Hive Metastore types.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/bbd8f5be/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 989740c..3a49ddd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -161,7 +161,11 @@ private[hive] trait HiveStrategies {
   object DataSinks extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.InsertIntoTable(table: MetastoreRelation, partition, child, 
overwrite) =>
-        InsertIntoHiveTable(table, partition, planLater(child), 
overwrite)(hiveContext) :: Nil
+        execution.InsertIntoHiveTable(
+          table, partition, planLater(child), overwrite)(hiveContext) :: Nil
+      case hive.InsertIntoHiveTable(table: MetastoreRelation, partition, 
child, overwrite) =>
+        execution.InsertIntoHiveTable(
+          table, partition, planLater(child), overwrite)(hiveContext) :: Nil
       case logical.CreateTableAsSelect(
              Some(database), tableName, child, allowExisting, Some(extra: 
ASTNode)) =>
         CreateTableAsSelect(

http://git-wip-us.apache.org/repos/asf/spark/blob/bbd8f5be/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 5dbfb92..fb481ed 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -121,4 +121,54 @@ class InsertIntoHiveTableSuite extends QueryTest {
     sql("DROP TABLE table_with_partition")
     sql("DROP TABLE tmp_table")
   }
+
+  test("Insert ArrayType.containsNull == false") {
+    val schema = StructType(Seq(
+      StructField("a", ArrayType(StringType, containsNull = false))))
+    val rowRDD = TestHive.sparkContext.parallelize((1 to 100).map(i => 
Row(Seq(s"value$i"))))
+    val schemaRDD = applySchema(rowRDD, schema)
+    schemaRDD.registerTempTable("tableWithArrayValue")
+    sql("CREATE TABLE hiveTableWithArrayValue(a Array <STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM 
tableWithArrayValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithArrayValue"),
+      rowRDD.collect().toSeq)
+
+    sql("DROP TABLE hiveTableWithArrayValue")
+  }
+
+  test("Insert MapType.valueContainsNull == false") {
+    val schema = StructType(Seq(
+      StructField("m", MapType(StringType, StringType, valueContainsNull = 
false))))
+    val rowRDD = TestHive.sparkContext.parallelize(
+      (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i"))))
+    val schemaRDD = applySchema(rowRDD, schema)
+    schemaRDD.registerTempTable("tableWithMapValue")
+    sql("CREATE TABLE hiveTableWithMapValue(m Map <STRING, STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM 
tableWithMapValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithMapValue"),
+      rowRDD.collect().toSeq)
+
+    sql("DROP TABLE hiveTableWithMapValue")
+  }
+
+  test("Insert StructType.fields.exists(_.nullable == false)") {
+    val schema = StructType(Seq(
+      StructField("s", StructType(Seq(StructField("f", StringType, nullable = 
false))))))
+    val rowRDD = TestHive.sparkContext.parallelize(
+      (1 to 100).map(i => Row(Row(s"value$i"))))
+    val schemaRDD = applySchema(rowRDD, schema)
+    schemaRDD.registerTempTable("tableWithStructValue")
+    sql("CREATE TABLE hiveTableWithStructValue(s Struct <f: STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM 
tableWithStructValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithStructValue"),
+      rowRDD.collect().toSeq)
+
+    sql("DROP TABLE hiveTableWithStructValue")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to