Repository: spark Updated Branches: refs/heads/branch-2.4 26e1d3ef8 -> 40ed093b7
[SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's input json as literal only The main purpose of `schema_of_json` is the usage of combination with `from_json` (to make up the leak of schema inference) which takes its schema only as literal; however, currently `schema_of_json` allows JSON input as non-literal expressions (e.g, column). This was mistakenly allowed - we don't have to take other usages rather then the main purpose into account for now. This PR makes a followup to only allow literals for `schema_of_json`'s JSON input. We can allow non literal expressions later when it's needed or there are some usecase for it. Unit tests were added. Closes #22775 from HyukjinKwon/SPARK-25447-followup. Lead-authored-by: hyukjinkwon <gurwls...@apache.org> Co-authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 33e337c1180a12edf1ae97f0221e389f23192461) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40ed093b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40ed093b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40ed093b Branch: refs/heads/branch-2.4 Commit: 40ed093b7a8122afdcc8f2fc83bff45ca67a60e1 Parents: 26e1d3e Author: hyukjinkwon <gurwls...@apache.org> Authored: Fri Oct 26 22:14:43 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Sat Oct 27 00:06:11 2018 +0800 ---------------------------------------------------------------------- python/pyspark/sql/functions.py | 22 ++++++------ .../catalyst/expressions/jsonExpressions.scala | 21 +++++++++--- .../scala/org/apache/spark/sql/functions.scala | 16 +++++++-- .../sql-tests/inputs/json-functions.sql | 5 +++ .../sql-tests/results/json-functions.sql.out | 36 +++++++++++++++++++- .../apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 6 files changed, 83 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 9485c28..a59d5c9 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2316,23 +2316,25 @@ def to_json(col, options={}): @ignore_unicode_prefix @since(2.4) -def schema_of_json(col): +def schema_of_json(json): """ - Parses a column containing a JSON string and infers its schema in DDL format. + Parses a JSON string and infers its schema in DDL format. - :param col: string column in json format + :param json: a JSON string or a string literal containing a JSON string. - >>> from pyspark.sql.types import * - >>> data = [(1, '{"a": 1}')] - >>> df = spark.createDataFrame(data, ("key", "value")) - >>> df.select(schema_of_json(df.value).alias("json")).collect() - [Row(json=u'struct<a:bigint>')] - >>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect() + >>> df = spark.range(1) + >>> df.select(schema_of_json('{"a": 0}').alias("json")).collect() [Row(json=u'struct<a:bigint>')] """ + if isinstance(json, basestring): + col = _create_column_from_literal(json) + elif isinstance(json, Column): + col = _to_java_column(json) + else: + raise TypeError("schema argument should be a column or string") sc = SparkContext._active_spark_context - jc = sc._jvm.functions.schema_of_json(_to_java_column(col)) + jc = sc._jvm.functions.schema_of_json(col) return Column(jc) http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index bd9090a..6650e45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -744,14 +744,27 @@ case class StructsToJson( """, since = "2.4.0") case class SchemaOfJson(child: Expression) - extends UnaryExpression with String2StringExpression with CodegenFallback { + extends UnaryExpression with CodegenFallback { + + override def dataType: DataType = StringType + + override def nullable: Boolean = false private val jsonOptions = new JSONOptions(Map.empty, "UTC") private val jsonFactory = new JsonFactory() jsonOptions.setJacksonOptions(jsonFactory) - override def convert(v: UTF8String): UTF8String = { - val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser => + @transient + private lazy val json = child.eval().asInstanceOf[UTF8String] + + override def checkInputDataTypes(): TypeCheckResult = child match { + case Literal(s, StringType) if s != null => super.checkInputDataTypes() + case _ => TypeCheckResult.TypeCheckFailure( + s"The input json should be a string literal and not null; however, got ${child.sql}.") + } + + override def eval(v: InternalRow): Any = { + val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => parser.nextToken() inferField(parser, jsonOptions) } @@ -765,7 +778,7 @@ object JsonExprUtils { def evalSchemaExpr(exp: Expression): DataType = exp match { case Literal(s, StringType) => DataType.fromDDL(s.toString) case e @ SchemaOfJson(_: Literal) => - val ddlSchema = e.eval().asInstanceOf[UTF8String] + val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String] DataType.fromDDL(ddlSchema.toString) case e => throw new AnalysisException( "Schema should be specified in DDL format as a string literal" + http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 9c4ad48..ac34ba6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3602,14 +3602,24 @@ object functions { } /** - * Parses a column containing a JSON string and infers its schema. + * Parses a JSON string and infers its schema in DDL format. * - * @param e a string column containing JSON data. + * @param json a JSON string. + * + * @group collection_funcs + * @since 2.4.0 + */ + def schema_of_json(json: String): Column = schema_of_json(lit(json)) + + /** + * Parses a JSON string and infers its schema in DDL format. + * + * @param json a string literal containing a JSON string. * * @group collection_funcs * @since 2.4.0 */ - def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr)) + def schema_of_json(json: Column): Column = withExpr(new SchemaOfJson(json.expr)) /** * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 0f22c0e..e391e93 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -56,3 +56,8 @@ select from_json('[{"a": 1}, 2]', 'array<map<string,int>>'); select to_json(array('1', '2', '3')); select to_json(array(array(1, 2, 3), array(4))); +select schema_of_json(null); +CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 1, "b": 2}', 'a'); +SELECT schema_of_json(jsonField) FROM jsonTable; +-- Clean up +DROP VIEW IF EXISTS jsonTable; http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index e550b43..94c1c97 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 40 +-- Number of queries: 44 -- !query 0 @@ -370,3 +370,37 @@ select to_json(array(array(1, 2, 3), array(4))) struct<structstojson(array(array(1, 2, 3), array(4))):string> -- !query 39 output [[1,2,3],[4]] + + +-- !query 40 +select schema_of_json(null) +-- !query 40 schema +struct<> +-- !query 40 output +org.apache.spark.sql.AnalysisException +cannot resolve 'schemaofjson(NULL)' due to data type mismatch: The input json should be a string literal and not null; however, got NULL.; line 1 pos 7 + + +-- !query 41 +CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 1, "b": 2}', 'a') +-- !query 41 schema +struct<> +-- !query 41 output + + + +-- !query 42 +SELECT schema_of_json(jsonField) FROM jsonTable +-- !query 42 schema +struct<> +-- !query 42 output +org.apache.spark.sql.AnalysisException +cannot resolve 'schemaofjson(jsontable.`jsonField`)' due to data type mismatch: The input json should be a string literal and not null; however, got jsontable.`jsonField`.; line 1 pos 7 + + +-- !query 43 +DROP VIEW IF EXISTS jsonTable +-- !query 43 schema +struct<> +-- !query 43 output + http://git-wip-us.apache.org/repos/asf/spark/blob/40ed093b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index fe4bf15..53ae1e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -391,7 +391,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { test("SPARK-24709: infers schemas of json strings and pass them to from_json") { val in = Seq("""{"a": [1, 2, 3]}""").toDS() - val out = in.select(from_json('value, schema_of_json(lit("""{"a": [1]}"""))) as "parsed") + val out = in.select(from_json('value, schema_of_json("""{"a": [1]}""")) as "parsed") val expected = StructType(StructField( "parsed", StructType(StructField( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org