Repository: spark
Updated Branches:
  refs/heads/branch-2.4 26e1d3ef8 -> 40ed093b7


[SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's input json as literal only

The main purpose of `schema_of_json` is the usage of combination with 
`from_json` (to make up the leak of schema inference) which takes its schema 
only as literal; however, currently `schema_of_json` allows JSON input as 
non-literal expressions (e.g, column).

This was mistakenly allowed - we don't have to take other usages rather then 
the main purpose into account for now.

This PR makes a followup to only allow literals for `schema_of_json`'s JSON 
input. We can allow non literal expressions later when it's needed or there are 
some usecase for it.

Unit tests were added.

Closes #22775 from HyukjinKwon/SPARK-25447-followup.

Lead-authored-by: hyukjinkwon <gurwls...@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls...@apache.org>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>
(cherry picked from commit 33e337c1180a12edf1ae97f0221e389f23192461)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40ed093b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40ed093b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40ed093b

Branch: refs/heads/branch-2.4
Commit: 40ed093b7a8122afdcc8f2fc83bff45ca67a60e1
Parents: 26e1d3e
Author: hyukjinkwon <gurwls...@apache.org>
Authored: Fri Oct 26 22:14:43 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Sat Oct 27 00:06:11 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 | 22 ++++++------
 .../catalyst/expressions/jsonExpressions.scala  | 21 +++++++++---
 .../scala/org/apache/spark/sql/functions.scala  | 16 +++++++--
 .../sql-tests/inputs/json-functions.sql         |  5 +++
 .../sql-tests/results/json-functions.sql.out    | 36 +++++++++++++++++++-
 .../apache/spark/sql/JsonFunctionsSuite.scala   |  2 +-
 6 files changed, 83 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 9485c28..a59d5c9 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2316,23 +2316,25 @@ def to_json(col, options={}):
 
 @ignore_unicode_prefix
 @since(2.4)
-def schema_of_json(col):
+def schema_of_json(json):
     """
-    Parses a column containing a JSON string and infers its schema in DDL 
format.
+    Parses a JSON string and infers its schema in DDL format.
 
-    :param col: string column in json format
+    :param json: a JSON string or a string literal containing a JSON string.
 
-    >>> from pyspark.sql.types import *
-    >>> data = [(1, '{"a": 1}')]
-    >>> df = spark.createDataFrame(data, ("key", "value"))
-    >>> df.select(schema_of_json(df.value).alias("json")).collect()
-    [Row(json=u'struct<a:bigint>')]
-    >>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect()
+    >>> df = spark.range(1)
+    >>> df.select(schema_of_json('{"a": 0}').alias("json")).collect()
     [Row(json=u'struct<a:bigint>')]
     """
+    if isinstance(json, basestring):
+        col = _create_column_from_literal(json)
+    elif isinstance(json, Column):
+        col = _to_java_column(json)
+    else:
+        raise TypeError("schema argument should be a column or string")
 
     sc = SparkContext._active_spark_context
-    jc = sc._jvm.functions.schema_of_json(_to_java_column(col))
+    jc = sc._jvm.functions.schema_of_json(col)
     return Column(jc)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index bd9090a..6650e45 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -744,14 +744,27 @@ case class StructsToJson(
   """,
   since = "2.4.0")
 case class SchemaOfJson(child: Expression)
-  extends UnaryExpression with String2StringExpression with CodegenFallback {
+  extends UnaryExpression with CodegenFallback {
+
+  override def dataType: DataType = StringType
+
+  override def nullable: Boolean = false
 
   private val jsonOptions = new JSONOptions(Map.empty, "UTC")
   private val jsonFactory = new JsonFactory()
   jsonOptions.setJacksonOptions(jsonFactory)
 
-  override def convert(v: UTF8String): UTF8String = {
-    val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, 
v)) { parser =>
+  @transient
+  private lazy val json = child.eval().asInstanceOf[UTF8String]
+
+  override def checkInputDataTypes(): TypeCheckResult = child match {
+    case Literal(s, StringType) if s != null => super.checkInputDataTypes()
+    case _ => TypeCheckResult.TypeCheckFailure(
+      s"The input json should be a string literal and not null; however, got 
${child.sql}.")
+  }
+
+  override def eval(v: InternalRow): Any = {
+    val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, 
json)) { parser =>
       parser.nextToken()
       inferField(parser, jsonOptions)
     }
@@ -765,7 +778,7 @@ object JsonExprUtils {
   def evalSchemaExpr(exp: Expression): DataType = exp match {
     case Literal(s, StringType) => DataType.fromDDL(s.toString)
     case e @ SchemaOfJson(_: Literal) =>
-      val ddlSchema = e.eval().asInstanceOf[UTF8String]
+      val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String]
       DataType.fromDDL(ddlSchema.toString)
     case e => throw new AnalysisException(
       "Schema should be specified in DDL format as a string literal" +

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 9c4ad48..ac34ba6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3602,14 +3602,24 @@ object functions {
   }
 
   /**
-   * Parses a column containing a JSON string and infers its schema.
+   * Parses a JSON string and infers its schema in DDL format.
    *
-   * @param e a string column containing JSON data.
+   * @param json a JSON string.
+   *
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def schema_of_json(json: String): Column = schema_of_json(lit(json))
+
+  /**
+   * Parses a JSON string and infers its schema in DDL format.
+   *
+   * @param json a string literal containing a JSON string.
    *
    * @group collection_funcs
    * @since 2.4.0
    */
-  def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr))
+  def schema_of_json(json: Column): Column = withExpr(new 
SchemaOfJson(json.expr))
 
   /**
    * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` 
or

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
index 0f22c0e..e391e93 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
@@ -56,3 +56,8 @@ select from_json('[{"a": 1}, 2]', 'array<map<string,int>>');
 select to_json(array('1', '2', '3'));
 select to_json(array(array(1, 2, 3), array(4)));
 
+select schema_of_json(null);
+CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 
1, "b": 2}', 'a');
+SELECT schema_of_json(jsonField) FROM jsonTable;
+-- Clean up
+DROP VIEW IF EXISTS jsonTable;

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index e550b43..94c1c97 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 40
+-- Number of queries: 44
 
 
 -- !query 0
@@ -370,3 +370,37 @@ select to_json(array(array(1, 2, 3), array(4)))
 struct<structstojson(array(array(1, 2, 3), array(4))):string>
 -- !query 39 output
 [[1,2,3],[4]]
+
+
+-- !query 40
+select schema_of_json(null)
+-- !query 40 schema
+struct<>
+-- !query 40 output
+org.apache.spark.sql.AnalysisException
+cannot resolve 'schemaofjson(NULL)' due to data type mismatch: The input json 
should be a string literal and not null; however, got NULL.; line 1 pos 7
+
+
+-- !query 41
+CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 
1, "b": 2}', 'a')
+-- !query 41 schema
+struct<>
+-- !query 41 output
+
+
+
+-- !query 42
+SELECT schema_of_json(jsonField) FROM jsonTable
+-- !query 42 schema
+struct<>
+-- !query 42 output
+org.apache.spark.sql.AnalysisException
+cannot resolve 'schemaofjson(jsontable.`jsonField`)' due to data type 
mismatch: The input json should be a string literal and not null; however, got 
jsontable.`jsonField`.; line 1 pos 7
+
+
+-- !query 43
+DROP VIEW IF EXISTS jsonTable
+-- !query 43 schema
+struct<>
+-- !query 43 output
+

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index fe4bf15..53ae1e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -391,7 +391,7 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
   test("SPARK-24709: infers schemas of json strings and pass them to 
from_json") {
     val in = Seq("""{"a": [1, 2, 3]}""").toDS()
-    val out = in.select(from_json('value, schema_of_json(lit("""{"a": 
[1]}"""))) as "parsed")
+    val out = in.select(from_json('value, schema_of_json("""{"a": [1]}""")) as 
"parsed")
     val expected = StructType(StructField(
       "parsed",
       StructType(StructField(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to