Repository: spark
Updated Branches:
  refs/heads/master 1e437835e -> 1007cae20


[SPARK-25447][SQL] Support JSON options by schema_of_json()

## What changes were proposed in this pull request?

In the PR, I propose to extended the `schema_of_json()` function, and accept 
JSON options since they can impact on schema inferring. Purpose is to support 
the same options that `from_json` can use during schema inferring.

## How was this patch tested?

Added SQL, Python and Scala tests (`JsonExpressionsSuite` and 
`JsonFunctionsSuite`) that checks JSON options are used.

Closes #22442 from MaxGekk/schema_of_json-options.

Authored-by: Maxim Gekk <maxim.g...@databricks.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1007cae2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1007cae2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1007cae2

Branch: refs/heads/master
Commit: 1007cae20e8f566e7d7c25f0f81c9b84f352b6d5
Parents: 1e43783
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Sat Sep 29 17:53:30 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Sat Sep 29 17:53:30 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 | 11 ++++++--
 .../catalyst/expressions/jsonExpressions.scala  | 28 +++++++++++++++-----
 .../expressions/JsonExpressionsSuite.scala      | 12 +++++++--
 .../scala/org/apache/spark/sql/functions.scala  | 15 +++++++++++
 .../sql-tests/inputs/json-functions.sql         |  4 +++
 .../sql-tests/results/json-functions.sql.out    | 18 ++++++++++++-
 .../apache/spark/sql/JsonFunctionsSuite.scala   |  8 ++++++
 7 files changed, 85 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index e5bc1ea..74f0685 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2348,11 +2348,15 @@ def to_json(col, options={}):
 
 @ignore_unicode_prefix
 @since(2.4)
-def schema_of_json(col):
+def schema_of_json(col, options={}):
     """
     Parses a column containing a JSON string and infers its schema in DDL 
format.
 
     :param col: string column in json format
+    :param options: options to control parsing. accepts the same options as 
the JSON datasource
+
+    .. versionchanged:: 2.5
+       It accepts `options` parameter to control schema inferring.
 
     >>> from pyspark.sql.types import *
     >>> data = [(1, '{"a": 1}')]
@@ -2361,10 +2365,13 @@ def schema_of_json(col):
     [Row(json=u'struct<a:bigint>')]
     >>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect()
     [Row(json=u'struct<a:bigint>')]
+    >>> schema = schema_of_json(lit('{a: 1}'), 
{'allowUnquotedFieldNames':'true'})
+    >>> df.select(schema.alias("json")).collect()
+    [Row(json=u'struct<a:bigint>')]
     """
 
     sc = SparkContext._active_spark_context
-    jc = sc._jvm.functions.schema_of_json(_to_java_column(col))
+    jc = sc._jvm.functions.schema_of_json(_to_java_column(col), options)
     return Column(jc)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index bd9090a..f5297dd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -740,15 +740,31 @@ case class StructsToJson(
   examples = """
     Examples:
       > SELECT _FUNC_('[{"col":0}]');
-       array<struct<col:int>>
+       array<struct<col:bigint>>
+      > SELECT _FUNC_('[{"col":01}]', map('allowNumericLeadingZeros', 'true'));
+       array<struct<col:bigint>>
   """,
   since = "2.4.0")
-case class SchemaOfJson(child: Expression)
+case class SchemaOfJson(
+    child: Expression,
+    options: Map[String, String])
   extends UnaryExpression with String2StringExpression with CodegenFallback {
 
-  private val jsonOptions = new JSONOptions(Map.empty, "UTC")
-  private val jsonFactory = new JsonFactory()
-  jsonOptions.setJacksonOptions(jsonFactory)
+  def this(child: Expression) = this(child, Map.empty[String, String])
+
+  def this(child: Expression, options: Expression) = this(
+      child = child,
+      options = JsonExprUtils.convertToMapData(options))
+
+  @transient
+  private lazy val jsonOptions = new JSONOptions(options, "UTC")
+
+  @transient
+  private lazy val jsonFactory = {
+    val factory = new JsonFactory()
+    jsonOptions.setJacksonOptions(factory)
+    factory
+  }
 
   override def convert(v: UTF8String): UTF8String = {
     val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, 
v)) { parser =>
@@ -764,7 +780,7 @@ object JsonExprUtils {
 
   def evalSchemaExpr(exp: Expression): DataType = exp match {
     case Literal(s, StringType) => DataType.fromDDL(s.toString)
-    case e @ SchemaOfJson(_: Literal) =>
+    case e @ SchemaOfJson(_: Literal, _) =>
       val ddlSchema = e.eval().asInstanceOf[UTF8String]
       DataType.fromDDL(ddlSchema.toString)
     case e => throw new AnalysisException(

http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 0e9c8ab..34fdd0c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -707,9 +707,17 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
   }
 
   test("SPARK-24709: infer schema of json strings") {
-    checkEvaluation(SchemaOfJson(Literal.create("""{"col":0}""")), 
"struct<col:bigint>")
+    checkEvaluation(new SchemaOfJson(Literal.create("""{"col":0}""")),
+      "struct<col:bigint>")
     checkEvaluation(
-      SchemaOfJson(Literal.create("""{"col0":["a"], "col1": {"col2": 
"b"}}""")),
+      new SchemaOfJson(Literal.create("""{"col0":["a"], "col1": {"col2": 
"b"}}""")),
       "struct<col0:array<string>,col1:struct<col2:string>>")
   }
+
+  test("infer schema of JSON strings by using options") {
+    checkEvaluation(
+      new SchemaOfJson(Literal.create("""{"col":01}"""),
+        CreateMap(Seq(Literal.create("allowNumericLeadingZeros"), 
Literal.create("true")))),
+      "struct<col:bigint>")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 4c58e77..59a1fcb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3612,6 +3612,21 @@ object functions {
   def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr))
 
   /**
+   * Parses a column containing a JSON string and infers its schema using 
options.
+   *
+   * @param e a string column containing JSON data.
+   * @param options options to control how the json is parsed. accepts the 
same options and the
+   *                json data source. See [[DataFrameReader#json]].
+   * @return a column with string literal containing schema in DDL format.
+   *
+   * @group collection_funcs
+   * @since 2.5.0
+   */
+  def schema_of_json(e: Column, options: java.util.Map[String, String]): 
Column = {
+    withExpr(SchemaOfJson(e.expr, options.asScala.toMap))
+  }
+
+  /**
    * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` 
or
    * a `MapType` into a JSON string with the specified schema.
    * Throws an exception, in the case of an unsupported type.

http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
index 0f22c0e..bdd1fe4 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
@@ -56,3 +56,7 @@ select from_json('[{"a": 1}, 2]', 'array<map<string,int>>');
 select to_json(array('1', '2', '3'));
 select to_json(array(array(1, 2, 3), array(4)));
 
+-- infer schema of json literal using options
+select schema_of_json('{"c1":1}', map('primitivesAsString', 'true'));
+select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 
'true', 'prefersDecimal', 'true'));
+

http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index e550b43..77e9000 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 40
+-- Number of queries: 42
 
 
 -- !query 0
@@ -370,3 +370,19 @@ select to_json(array(array(1, 2, 3), array(4)))
 struct<structstojson(array(array(1, 2, 3), array(4))):string>
 -- !query 39 output
 [[1,2,3],[4]]
+
+
+-- !query 40
+select schema_of_json('{"c1":1}', map('primitivesAsString', 'true'))
+-- !query 40 schema
+struct<schemaofjson({"c1":1}):string>
+-- !query 40 output
+struct<c1:string>
+
+
+-- !query 41
+select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 
'true', 'prefersDecimal', 'true'))
+-- !query 41 schema
+struct<schemaofjson({"c1":01, "c2":0.1}):string>
+-- !query 41 output
+struct<c1:bigint,c2:decimal(1,1)>

http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 853bc18..5cbf101 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import collection.JavaConverters._
+
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -402,6 +404,12 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
     assert(out.schema == expected)
   }
 
+  test("infers schemas using options") {
+    val df = spark.range(1)
+      .select(schema_of_json(lit("{a:1}"), Map("allowUnquotedFieldNames" -> 
"true").asJava))
+    checkAnswer(df, Seq(Row("struct<a:bigint>")))
+  }
+
   test("from_json - array of primitive types") {
     val df = Seq("[1, 2, 3]").toDF("a")
     val schema = new ArrayType(IntegerType, false)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to