Repository: spark
Updated Branches:
  refs/heads/master 9cf56c96b -> c34c27fe9


[SPARK-9034][SQL] Reflect field names defined in GenericUDTF

Hive GenericUDTF#initialize() defines field names in a returned schema though,
the current HiveGenericUDTF drops these names.
We might need to reflect these in a logical plan tree.

Author: navis.ryu <na...@apache.org>

Closes #8456 from navis/SPARK-9034.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c34c27fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c34c27fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c34c27fe

Branch: refs/heads/master
Commit: c34c27fe9244939d8c905cd689536dfb81c74d7d
Parents: 9cf56c9
Author: navis.ryu <na...@apache.org>
Authored: Mon Nov 2 23:52:36 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Nov 2 23:52:36 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   | 11 +++++------
 .../spark/sql/catalyst/expressions/generators.scala     | 12 +++++++-----
 .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 10 +++++-----
 .../sql/hive/execution/HiveCompatibilitySuite.scala     |  1 +
 .../main/scala/org/apache/spark/sql/hive/hiveUDFs.scala |  2 +-
 ...in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e |  1 +
 ...in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30 |  1 +
 ...eral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada |  1 +
 ...eral_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7 |  0
 ...eral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42 |  2 ++
 ...eral_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a |  0
 ...eral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56 |  2 ++
 ...teral_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe |  0
 ...eral_view_noalias-6-16d227442dd775615c6ecfceedc6c612 |  0
 ...eral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3 |  2 ++
 .../spark/sql/hive/execution/HiveQuerySuite.scala       |  6 ++++++
 16 files changed, 34 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 912c967..899ee67 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -147,7 +147,7 @@ class Analyzer(
             case u @ UnresolvedAlias(child) => child match {
               case ne: NamedExpression => ne
               case e if !e.resolved => u
-              case g: Generator if g.elementTypes.size > 1 => MultiAlias(g, 
Nil)
+              case g: Generator => MultiAlias(g, Nil)
               case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)()
               case other => Alias(other, s"_c$i")()
             }
@@ -722,7 +722,7 @@ class Analyzer(
 
     /**
      * Construct the output attributes for a [[Generator]], given a list of 
names.  If the list of
-     * names is empty names are assigned by ordinal (i.e., _c0, _c1, ...) to 
match Hive's defaults.
+     * names is empty names are assigned from field names in generator.
      */
     private def makeGeneratorOutput(
         generator: Generator,
@@ -731,13 +731,12 @@ class Analyzer(
 
       if (names.length == elementTypes.length) {
         names.zip(elementTypes).map {
-          case (name, (t, nullable)) =>
+          case (name, (t, nullable, _)) =>
             AttributeReference(name, t, nullable)()
         }
       } else if (names.isEmpty) {
-        elementTypes.zipWithIndex.map {
-          // keep the default column names as Hive does _c0, _c1, _cN
-          case ((t, nullable), i) => AttributeReference(s"_c$i", t, nullable)()
+        elementTypes.map {
+          case (t, nullable, name) => AttributeReference(name, t, nullable)()
         }
       } else {
         failAnalysis(

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 1a2092c..894a073 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -53,7 +53,7 @@ trait Generator extends Expression {
    * The output element data types in structure of Seq[(DataType, Nullable)]
    * TODO we probably need to add more information like metadata etc.
    */
-  def elementTypes: Seq[(DataType, Boolean)]
+  def elementTypes: Seq[(DataType, Boolean, String)]
 
   /** Should be implemented by child classes to perform specific Generators. */
   override def eval(input: InternalRow): TraversableOnce[InternalRow]
@@ -69,7 +69,7 @@ trait Generator extends Expression {
  * A generator that produces its output using the provided lambda function.
  */
 case class UserDefinedGenerator(
-    elementTypes: Seq[(DataType, Boolean)],
+    elementTypes: Seq[(DataType, Boolean, String)],
     function: Row => TraversableOnce[InternalRow],
     children: Seq[Expression])
   extends Generator with CodegenFallback {
@@ -112,9 +112,11 @@ case class Explode(child: Expression) extends 
UnaryExpression with Generator wit
     }
   }
 
-  override def elementTypes: Seq[(DataType, Boolean)] = child.dataType match {
-    case ArrayType(et, containsNull) => (et, containsNull) :: Nil
-    case MapType(kt, vt, valueContainsNull) => (kt, false) :: (vt, 
valueContainsNull) :: Nil
+  // hive-compatible default alias for explode function ("col" for array, 
"key", "value" for map)
+  override def elementTypes: Seq[(DataType, Boolean, String)] = child.dataType 
match {
+    case ArrayType(et, containsNull) => (et, containsNull, "col") :: Nil
+    case MapType(kt, vt, valueContainsNull) =>
+      (kt, false, "key") :: (vt, valueContainsNull, "value") :: Nil
   }
 
   override def eval(input: InternalRow): TraversableOnce[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 53ad3c0..fc0ab63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1175,7 +1175,8 @@ class DataFrame private[sql](
   def explode[A <: Product : TypeTag](input: Column*)(f: Row => 
TraversableOnce[A]): DataFrame = {
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
 
-    val elementTypes = schema.toAttributes.map { attr => (attr.dataType, 
attr.nullable) }
+    val elementTypes = schema.toAttributes.map {
+      attr => (attr.dataType, attr.nullable, attr.name) }
     val names = schema.toAttributes.map(_.name)
     val convert = CatalystTypeConverters.createToCatalystConverter(schema)
 
@@ -1184,7 +1185,7 @@ class DataFrame private[sql](
     val generator = UserDefinedGenerator(elementTypes, rowFunction, 
input.map(_.expr))
 
     Generate(generator, join = true, outer = false,
-      qualifier = None, names.map(UnresolvedAttribute(_)), logicalPlan)
+      qualifier = None, generatorOutput = Nil, logicalPlan)
   }
 
   /**
@@ -1203,8 +1204,7 @@ class DataFrame private[sql](
     val dataType = ScalaReflection.schemaFor[B].dataType
     val attributes = AttributeReference(outputColumn, dataType)() :: Nil
     // TODO handle the metadata?
-    val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable) 
}
-    val names = attributes.map(_.name)
+    val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable, 
attr.name) }
 
     def rowFunction(row: Row): TraversableOnce[InternalRow] = {
       val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
@@ -1213,7 +1213,7 @@ class DataFrame private[sql](
     val generator = UserDefinedGenerator(elementTypes, rowFunction, 
apply(inputColumn).expr :: Nil)
 
     Generate(generator, join = true, outer = false,
-      qualifier = None, names.map(UnresolvedAttribute(_)), logicalPlan)
+      qualifier = None, generatorOutput = Nil, logicalPlan)
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 6ed40b0..2d0d7b8 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -661,6 +661,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
     "join_star",
     "lateral_view",
     "lateral_view_cp",
+    "lateral_view_noalias",
     "lateral_view_ppd",
     "leftsemijoin",
     "leftsemijoin_mr",

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 0b5e863..a9db701 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -511,7 +511,7 @@ private[hive] case class HiveGenericUDTF(
   protected lazy val collector = new UDTFCollector
 
   override lazy val elementTypes = 
outputInspector.getAllStructFieldRefs.asScala.map {
-    field => (inspectorToDataType(field.getFieldObjectInspector), true)
+    field => (inspectorToDataType(field.getFieldObjectInspector), true, 
field.getFieldName)
   }
 
   @transient

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/resources/golden/SPARK-9034
 Reflect field names defined in GenericUDTF 
#1-0-ff502d8c06f4b32f57aa45057b7fab0e
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names 
defined in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e 
b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in 
GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e
new file mode 100644
index 0000000..1cf253f
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined 
in GenericUDTF #1-0-ff502d8c06f4b32f57aa45057b7fab0e    
@@ -0,0 +1 @@
+238

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/resources/golden/SPARK-9034
 Reflect field names defined in GenericUDTF 
#2-0-d6d0def30a7fad5f90fd835361820c30
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names 
defined in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30 
b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined in 
GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30
new file mode 100644
index 0000000..60878ff
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/SPARK-9034 Reflect field names defined 
in GenericUDTF #2-0-d6d0def30a7fad5f90fd835361820c30    
@@ -0,0 +1 @@
+238    val_238

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada
 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada
new file mode 100644
index 0000000..573541a
--- /dev/null
+++ 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-0-50131c0ba7b7a6b65c789a5a8497bada
@@ -0,0 +1 @@
+0

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/resources/golden/lateral_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/resources/golden/lateral_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7
 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-1-72509f06e1f7c5d5ccc292f775f8eea7
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42
 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42
new file mode 100644
index 0000000..0da0d93
--- /dev/null
+++ 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-2-6d5806dd1d2511911a5de1e205523f42
@@ -0,0 +1,2 @@
+key1   100
+key2   200

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/resources/golden/lateral_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/resources/golden/lateral_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a
 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-3-155b3cc2f5054725a9c2acca3c38c00a
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56
 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56
new file mode 100644
index 0000000..0da0d93
--- /dev/null
+++ 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-4-3b7045ace234af8e5e86d8ac23ccee56
@@ -0,0 +1,2 @@
+key1   100
+key2   200

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/resources/golden/lateral_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/resources/golden/lateral_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe
 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-5-e1eca4e08216897d090259d4fd1e3fe
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/resources/golden/lateral_view_noalias-6-16d227442dd775615c6ecfceedc6c612
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/resources/golden/lateral_view_noalias-6-16d227442dd775615c6ecfceedc6c612
 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-6-16d227442dd775615c6ecfceedc6c612
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3
 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3
new file mode 100644
index 0000000..4ba46bb
--- /dev/null
+++ 
b/sql/hive/src/test/resources/golden/lateral_view_noalias-7-66cb5ab20690dd85b2ed95bbfb9481d3
@@ -0,0 +1,2 @@
+key1   100     key1    100
+key2   200     key2    200

http://git-wip-us.apache.org/repos/asf/spark/blob/c34c27fe/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index e597d68..fc72e3c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -563,6 +563,12 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
   createQueryTest("Specify the udtf output",
     "SELECT d FROM (SELECT explode(array(1,1)) d FROM src LIMIT 1) t")
 
+  createQueryTest("SPARK-9034 Reflect field names defined in GenericUDTF #1",
+    "SELECT col FROM (SELECT explode(array(key,value)) FROM src LIMIT 1) t")
+
+  createQueryTest("SPARK-9034 Reflect field names defined in GenericUDTF #2",
+    "SELECT key,value FROM (SELECT explode(map(key,value)) FROM src LIMIT 1) 
t")
+
   test("sampling") {
     sql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s")
     sql("SELECT * FROM src TABLESAMPLE(100 PERCENT) s")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to