Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6e2e84f3e -> 5ccc1eb08


[SPARK-11590][SQL] use native json_tuple in lateral view

Author: Wenchen Fan <wenc...@databricks.com>

Closes #9562 from cloud-fan/json-tuple.

(cherry picked from commit 53600854c270d4c953fe95fbae528740b5cf6603)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ccc1eb0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ccc1eb0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ccc1eb0

Branch: refs/heads/branch-1.6
Commit: 5ccc1eb08c14291bb1e94b1cd9fa3bff1172529d
Parents: 6e2e84f
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Tue Nov 10 11:21:31 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Nov 10 11:22:11 2015 -0800

----------------------------------------------------------------------
 .../catalyst/expressions/jsonExpressions.scala  | 23 +++++----------
 .../expressions/JsonExpressionsSuite.scala      | 30 +++++++++++--------
 .../scala/org/apache/spark/sql/DataFrame.scala  |  8 +++--
 .../scala/org/apache/spark/sql/functions.scala  | 12 ++++++++
 .../apache/spark/sql/JsonFunctionsSuite.scala   | 23 ++++++++-------
 .../org/apache/spark/sql/hive/HiveQl.scala      |  4 +++
 .../org/apache/spark/sql/hive/HiveQlSuite.scala | 13 ++++++++
 .../sql/hive/execution/SQLQuerySuite.scala      | 31 ++++++++++++++++++++
 8 files changed, 104 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 8c9853e..8cd7323 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -314,7 +314,7 @@ case class GetJsonObject(json: Expression, path: Expression)
 }
 
 case class JsonTuple(children: Seq[Expression])
-  extends Expression with CodegenFallback {
+  extends Generator with CodegenFallback {
 
   import SharedFactory._
 
@@ -324,8 +324,8 @@ case class JsonTuple(children: Seq[Expression])
   }
 
   // if processing fails this shared value will be returned
-  @transient private lazy val nullRow: InternalRow =
-    new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length))
+  @transient private lazy val nullRow: Seq[InternalRow] =
+    new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil
 
   // the json body is the first child
   @transient private lazy val jsonExpr: Expression = children.head
@@ -344,15 +344,8 @@ case class JsonTuple(children: Seq[Expression])
   // and count the number of foldable fields, we'll use this later to optimize 
evaluation
   @transient private lazy val constantFields: Int = foldableFieldNames.count(_ 
!= null)
 
-  override lazy val dataType: StructType = {
-    val fields = fieldExpressions.zipWithIndex.map {
-      case (_, idx) => StructField(
-        name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize
-        dataType = StringType,
-        nullable = true)
-    }
-
-    StructType(fields)
+  override def elementTypes: Seq[(DataType, Boolean, String)] = 
fieldExpressions.zipWithIndex.map {
+    case (_, idx) => (StringType, true, s"c$idx")
   }
 
   override def prettyName: String = "json_tuple"
@@ -367,7 +360,7 @@ case class JsonTuple(children: Seq[Expression])
     }
   }
 
-  override def eval(input: InternalRow): InternalRow = {
+  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
     val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
     if (json == null) {
       return nullRow
@@ -383,7 +376,7 @@ case class JsonTuple(children: Seq[Expression])
     }
   }
 
-  private def parseRow(parser: JsonParser, input: InternalRow): InternalRow = {
+  private def parseRow(parser: JsonParser, input: InternalRow): 
Seq[InternalRow] = {
     // only objects are supported
     if (parser.nextToken() != JsonToken.START_OBJECT) {
       return nullRow
@@ -433,7 +426,7 @@ case class JsonTuple(children: Seq[Expression])
       parser.skipChildren()
     }
 
-    new GenericInternalRow(row)
+    new GenericInternalRow(row) :: Nil
   }
 
   private def copyCurrentStructure(generator: JsonGenerator, parser: 
JsonParser): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index f33125f..7b75409 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -209,8 +209,12 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     Literal("f5") ::
     Nil
 
+  private def checkJsonTuple(jt: JsonTuple, expected: InternalRow): Unit = {
+    assert(jt.eval(null).toSeq.head === expected)
+  }
+
   test("json_tuple - hive key 1") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(
         Literal("""{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") ::
           jsonTupleQuery),
@@ -218,7 +222,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("json_tuple - hive key 2") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(
         Literal("""{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") 
::
           jsonTupleQuery),
@@ -226,7 +230,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("json_tuple - hive key 2 (mix of foldable fields)") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(Literal("""{"f1": "value12", "f3": "value3", "f2": 2, "f4": 
4.01}""") ::
         Literal("f1") ::
         NonFoldableLiteral("f2") ::
@@ -238,7 +242,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("json_tuple - hive key 3") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(
         Literal("""{"f1": "value13", "f4": "value44", "f3": "value33", "f2": 
2, "f5": 5.01}""") ::
           jsonTupleQuery),
@@ -247,7 +251,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("json_tuple - hive key 3 (nonfoldable json)") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(
         NonFoldableLiteral(
           """{"f1": "value13", "f4": "value44",
@@ -258,7 +262,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("json_tuple - hive key 3 (nonfoldable fields)") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(Literal(
         """{"f1": "value13", "f4": "value44",
           | "f3": "value33", "f2": 2, "f5": 5.01}""".stripMargin) ::
@@ -273,43 +277,43 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("json_tuple - hive key 4 - null json") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(Literal(null) :: jsonTupleQuery),
       InternalRow.fromSeq(Seq(null, null, null, null, null)))
   }
 
   test("json_tuple - hive key 5 - null and empty fields") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(Literal("""{"f1": "", "f5": null}""") :: jsonTupleQuery),
       InternalRow.fromSeq(Seq(UTF8String.fromString(""), null, null, null, 
null)))
   }
 
   test("json_tuple - hive key 6 - invalid json (array)") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(Literal("[invalid JSON string]") :: jsonTupleQuery),
       InternalRow.fromSeq(Seq(null, null, null, null, null)))
   }
 
   test("json_tuple - invalid json (object start only)") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(Literal("{") :: jsonTupleQuery),
       InternalRow.fromSeq(Seq(null, null, null, null, null)))
   }
 
   test("json_tuple - invalid json (no object end)") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(Literal("""{"foo": "bar"""") :: jsonTupleQuery),
       InternalRow.fromSeq(Seq(null, null, null, null, null)))
   }
 
   test("json_tuple - invalid json (invalid json)") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(Literal("\\") :: jsonTupleQuery),
       InternalRow.fromSeq(Seq(null, null, null, null, null)))
   }
 
   test("json_tuple - preserve newlines") {
-    checkEvaluation(
+    checkJsonTuple(
       JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
       InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 3b69247..9368435 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -750,10 +750,14 @@ class DataFrame private[sql](
       // will remove intermediate Alias for ExtractValue chain, and we need to 
alias it again to
       // make it a NamedExpression.
       case Column(u: UnresolvedAttribute) => UnresolvedAlias(u)
+
       case Column(expr: NamedExpression) => expr
-      // Leave an unaliased explode with an empty list of names since the 
analyzer will generate the
-      // correct defaults after the nested expression's type has been resolved.
+
+      // Leave an unaliased generator with an empty list of names since the 
analyzer will generate
+      // the correct defaults after the nested expression's type has been 
resolved.
       case Column(explode: Explode) => MultiAlias(explode, Nil)
+      case Column(jt: JsonTuple) => MultiAlias(jt, Nil)
+
       case Column(expr: Expression) => Alias(expr, expr.prettyString)()
     }
     Project(namedExpressions.toSeq, logicalPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 22104e4..a59d738 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2308,6 +2308,18 @@ object functions extends LegacyFunctions {
   def explode(e: Column): Column = withExpr { Explode(e.expr) }
 
   /**
+   * Creates a new row for a json column according to the given field names.
+   *
+   * @group collection_funcs
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def json_tuple(json: Column, fields: String*): Column = withExpr {
+    require(fields.length > 0, "at least 1 field name should be given.")
+    JsonTuple(json.expr +: fields.map(Literal.apply))
+  }
+
+  /**
    * Returns length of array or map.
    *
    * @group collection_funcs

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index e3531d0..14fd56f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -41,23 +41,26 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
   test("json_tuple select") {
     val df: DataFrame = tuples.toDF("key", "jstring")
-    val expected = Row("1", Row("value1", "value2", "3", null, "5.23")) ::
-      Row("2", Row("value12", "2", "value3", "4.01", null)) ::
-      Row("3", Row("value13", "2", "value33", "value44", "5.01")) ::
-      Row("4", Row(null, null, null, null, null)) ::
-      Row("5", Row("", null, null, null, null)) ::
-      Row("6", Row(null, null, null, null, null)) ::
+    val expected =
+      Row("1", "value1", "value2", "3", null, "5.23") ::
+      Row("2", "value12", "2", "value3", "4.01", null) ::
+      Row("3", "value13", "2", "value33", "value44", "5.01") ::
+      Row("4", null, null, null, null, null) ::
+      Row("5", "", null, null, null, null) ::
+      Row("6", null, null, null, null, null) ::
       Nil
 
-    checkAnswer(df.selectExpr("key", "json_tuple(jstring, 'f1', 'f2', 'f3', 
'f4', 'f5')"), expected)
+    checkAnswer(
+      df.select($"key", functions.json_tuple($"jstring", "f1", "f2", "f3", 
"f4", "f5")),
+      expected)
   }
 
   test("json_tuple filter and group") {
     val df: DataFrame = tuples.toDF("key", "jstring")
     val expr = df
-      .selectExpr("json_tuple(jstring, 'f1', 'f2') as jt")
-      .where($"jt.c0".isNotNull)
-      .groupBy($"jt.c1")
+      .select(functions.json_tuple($"jstring", "f1", "f2"))
+      .where($"c0".isNotNull)
+      .groupBy($"c1")
       .count()
 
     val expected = Row(null, 1) ::

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 6f8ed41..091caab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -1821,6 +1821,7 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
   }
 
   val explode = "(?i)explode".r
+  val jsonTuple = "(?i)json_tuple".r
   def nodesToGenerator(nodes: Seq[Node]): (Generator, Seq[String]) = {
     val function = nodes.head
 
@@ -1833,6 +1834,9 @@ 
https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       case Token("TOK_FUNCTION", Token(explode(), Nil) :: child :: Nil) =>
         (Explode(nodeToExpr(child)), attributes)
 
+      case Token("TOK_FUNCTION", Token(jsonTuple(), Nil) :: children) =>
+        (JsonTuple(children.map(nodeToExpr)), attributes)
+
       case Token("TOK_FUNCTION", Token(functionName, Nil) :: children) =>
         val functionInfo: FunctionInfo =
           
Option(FunctionRegistry.getFunctionInfo(functionName.toLowerCase)).getOrElse(

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
index 528a739..a330362 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.hive
 
 import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.spark.sql.catalyst.expressions.JsonTuple
+import org.apache.spark.sql.catalyst.plans.logical.Generate
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkFunSuite
@@ -183,4 +185,15 @@ class HiveQlSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     assertError("select interval '.1111111111' second",
       "nanosecond 1111111111 outside range")
   }
+
+  test("use native json_tuple instead of hive's UDTF in LATERAL VIEW") {
+    val plan = HiveQl.parseSql(
+      """
+        |SELECT *
+        |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test
+        |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt AS a, b
+      """.stripMargin)
+
+    
assert(plan.children.head.asInstanceOf[Generate].generator.isInstanceOf[JsonTuple])
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccc1eb0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 9a425d7..3427152 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1448,4 +1448,35 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
         Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Row("4", "40") 
:: Nil)
     }
   }
+
+  test("SPARK-11590: use native json_tuple in lateral view") {
+    checkAnswer(sql(
+      """
+        |SELECT a, b
+        |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test
+        |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt AS a, b
+      """.stripMargin), Row("value1", "12"))
+
+    // we should use `c0`, `c1`... as the name of fields if no alias is 
provided, to follow hive.
+    checkAnswer(sql(
+      """
+        |SELECT c0, c1
+        |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test
+        |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt
+      """.stripMargin), Row("value1", "12"))
+
+    // we can also use `json_tuple` in project list.
+    checkAnswer(sql(
+      """
+        |SELECT json_tuple(json, 'f1', 'f2')
+        |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test
+      """.stripMargin), Row("value1", "12"))
+
+    // we can also mix `json_tuple` with other project expressions.
+    checkAnswer(sql(
+      """
+        |SELECT json_tuple(json, 'f1', 'f2'), 3.14, str
+        |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test
+      """.stripMargin), Row("value1", "12", 3.14, "hello"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to