Repository: spark
Updated Branches:
  refs/heads/master 7d44bc264 -> 33e337c11


[SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's input json as literal only

## What changes were proposed in this pull request?

The main purpose of `schema_of_json` is the usage of combination with 
`from_json` (to make up the leak of schema inference) which takes its schema 
only as literal; however, currently `schema_of_json` allows JSON input as 
non-literal expressions (e.g, column).

This was mistakenly allowed - we don't have to take other usages rather then 
the main purpose into account for now.

This PR makes a followup to only allow literals for `schema_of_json`'s JSON 
input. We can allow non literal expressions later when it's needed or there are 
some usecase for it.

## How was this patch tested?

Unit tests were added.

Closes #22775 from HyukjinKwon/SPARK-25447-followup.

Lead-authored-by: hyukjinkwon <gurwls...@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls...@apache.org>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33e337c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33e337c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33e337c1

Branch: refs/heads/master
Commit: 33e337c1180a12edf1ae97f0221e389f23192461
Parents: 7d44bc2
Author: hyukjinkwon <gurwls...@apache.org>
Authored: Fri Oct 26 22:14:43 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Oct 26 22:14:43 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 | 22 ++++++------
 .../catalyst/expressions/jsonExpressions.scala  | 21 +++++++++---
 .../scala/org/apache/spark/sql/functions.scala  | 24 +++++++++----
 .../sql-tests/inputs/json-functions.sql         |  6 +++-
 .../sql-tests/results/json-functions.sql.out    | 36 +++++++++++++++++++-
 .../apache/spark/sql/JsonFunctionsSuite.scala   |  2 +-
 6 files changed, 87 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 739496b..ca2a256 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2335,30 +2335,32 @@ def to_json(col, options={}):
 
 @ignore_unicode_prefix
 @since(2.4)
-def schema_of_json(col, options={}):
+def schema_of_json(json, options={}):
     """
-    Parses a column containing a JSON string and infers its schema in DDL 
format.
+    Parses a JSON string and infers its schema in DDL format.
 
-    :param col: string column in json format
+    :param json: a JSON string or a string literal containing a JSON string.
     :param options: options to control parsing. accepts the same options as 
the JSON datasource
 
     .. versionchanged:: 3.0
        It accepts `options` parameter to control schema inferring.
 
-    >>> from pyspark.sql.types import *
-    >>> data = [(1, '{"a": 1}')]
-    >>> df = spark.createDataFrame(data, ("key", "value"))
-    >>> df.select(schema_of_json(df.value).alias("json")).collect()
-    [Row(json=u'struct<a:bigint>')]
+    >>> df = spark.range(1)
     >>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect()
     [Row(json=u'struct<a:bigint>')]
-    >>> schema = schema_of_json(lit('{a: 1}'), 
{'allowUnquotedFieldNames':'true'})
+    >>> schema = schema_of_json('{a: 1}', {'allowUnquotedFieldNames':'true'})
     >>> df.select(schema.alias("json")).collect()
     [Row(json=u'struct<a:bigint>')]
     """
+    if isinstance(json, basestring):
+        col = _create_column_from_literal(json)
+    elif isinstance(json, Column):
+        col = _to_java_column(json)
+    else:
+        raise TypeError("schema argument should be a column or string")
 
     sc = SparkContext._active_spark_context
-    jc = sc._jvm.functions.schema_of_json(_to_java_column(col), options)
+    jc = sc._jvm.functions.schema_of_json(col, options)
     return Column(jc)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index e966924..77af590 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -742,7 +742,7 @@ case class StructsToJson(
 case class SchemaOfJson(
     child: Expression,
     options: Map[String, String])
-  extends UnaryExpression with String2StringExpression with CodegenFallback {
+  extends UnaryExpression with CodegenFallback {
 
   def this(child: Expression) = this(child, Map.empty[String, String])
 
@@ -750,6 +750,10 @@ case class SchemaOfJson(
       child = child,
       options = ExprUtils.convertToMapData(options))
 
+  override def dataType: DataType = StringType
+
+  override def nullable: Boolean = false
+
   @transient
   private lazy val jsonOptions = new JSONOptions(options, "UTC")
 
@@ -760,8 +764,17 @@ case class SchemaOfJson(
     factory
   }
 
-  override def convert(v: UTF8String): UTF8String = {
-    val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, 
v)) { parser =>
+  @transient
+  private lazy val json = child.eval().asInstanceOf[UTF8String]
+
+  override def checkInputDataTypes(): TypeCheckResult = child match {
+    case Literal(s, StringType) if s != null => super.checkInputDataTypes()
+    case _ => TypeCheckResult.TypeCheckFailure(
+      s"The input json should be a string literal and not null; however, got 
${child.sql}.")
+  }
+
+  override def eval(v: InternalRow): Any = {
+    val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, 
json)) { parser =>
       parser.nextToken()
       inferField(parser, jsonOptions)
     }
@@ -776,7 +789,7 @@ object JsonExprUtils {
   def evalSchemaExpr(exp: Expression): DataType = exp match {
     case Literal(s, StringType) => DataType.fromDDL(s.toString)
     case e @ SchemaOfJson(_: Literal, _) =>
-      val ddlSchema = e.eval().asInstanceOf[UTF8String]
+      val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String]
       DataType.fromDDL(ddlSchema.toString)
     case e => throw new AnalysisException(
       "Schema should be specified in DDL format as a string literal" +

http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 2748e64..757a322 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3626,19 +3626,29 @@ object functions {
   }
 
   /**
-   * Parses a column containing a JSON string and infers its schema.
+   * Parses a JSON string and infers its schema in DDL format.
    *
-   * @param e a string column containing JSON data.
+   * @param json a JSON string.
    *
    * @group collection_funcs
    * @since 2.4.0
    */
-  def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr))
+  def schema_of_json(json: String): Column = schema_of_json(lit(json))
 
   /**
-   * Parses a column containing a JSON string and infers its schema using 
options.
+   * Parses a JSON string and infers its schema in DDL format.
    *
-   * @param e a string column containing JSON data.
+   * @param json a string literal containing a JSON string.
+   *
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def schema_of_json(json: Column): Column = withExpr(new 
SchemaOfJson(json.expr))
+
+  /**
+   * Parses a JSON string and infers its schema in DDL format using options.
+   *
+   * @param json a string column containing JSON data.
    * @param options options to control how the json is parsed. accepts the 
same options and the
    *                json data source. See [[DataFrameReader#json]].
    * @return a column with string literal containing schema in DDL format.
@@ -3646,8 +3656,8 @@ object functions {
    * @group collection_funcs
    * @since 3.0.0
    */
-  def schema_of_json(e: Column, options: java.util.Map[String, String]): 
Column = {
-    withExpr(SchemaOfJson(e.expr, options.asScala.toMap))
+  def schema_of_json(json: Column, options: java.util.Map[String, String]): 
Column = {
+    withExpr(SchemaOfJson(json.expr, options.asScala.toMap))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
index 8bfd7c0..6c14eee 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
@@ -55,4 +55,8 @@ select to_json(array(array(1, 2, 3), array(4)));
 -- infer schema of json literal using options
 select schema_of_json('{"c1":1}', map('primitivesAsString', 'true'));
 select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 
'true', 'prefersDecimal', 'true'));
-
+select schema_of_json(null);
+CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 
1, "b": 2}', 'a');
+SELECT schema_of_json(jsonField) FROM jsonTable;
+-- Clean up
+DROP VIEW IF EXISTS jsonTable;

http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index c70a81e..ca0cd90 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 38
+-- Number of queries: 42
 
 
 -- !query 0
@@ -318,3 +318,37 @@ select schema_of_json('{"c1":01, "c2":0.1}', 
map('allowNumericLeadingZeros', 'tr
 struct<schema_of_json({"c1":01, "c2":0.1}):string>
 -- !query 37 output
 struct<c1:bigint,c2:decimal(1,1)>
+
+
+-- !query 38
+select schema_of_json(null)
+-- !query 38 schema
+struct<>
+-- !query 38 output
+org.apache.spark.sql.AnalysisException
+cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input 
json should be a string literal and not null; however, got NULL.; line 1 pos 7
+
+
+-- !query 39
+CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 
1, "b": 2}', 'a')
+-- !query 39 schema
+struct<>
+-- !query 39 output
+
+
+
+-- !query 40
+SELECT schema_of_json(jsonField) FROM jsonTable
+-- !query 40 schema
+struct<>
+-- !query 40 output
+org.apache.spark.sql.AnalysisException
+cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type 
mismatch: The input json should be a string literal and not null; however, got 
jsontable.`jsonField`.; line 1 pos 7
+
+
+-- !query 41
+DROP VIEW IF EXISTS jsonTable
+-- !query 41 schema
+struct<>
+-- !query 41 output
+

http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 797b274..2b09782 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -395,7 +395,7 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
   test("SPARK-24709: infers schemas of json strings and pass them to 
from_json") {
     val in = Seq("""{"a": [1, 2, 3]}""").toDS()
-    val out = in.select(from_json('value, schema_of_json(lit("""{"a": 
[1]}"""))) as "parsed")
+    val out = in.select(from_json('value, schema_of_json("""{"a": [1]}""")) as 
"parsed")
     val expected = StructType(StructField(
       "parsed",
       StructType(StructField(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to