Repository: spark Updated Branches: refs/heads/master 7d44bc264 -> 33e337c11
[SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's input json as literal only ## What changes were proposed in this pull request? The main purpose of `schema_of_json` is the usage of combination with `from_json` (to make up the leak of schema inference) which takes its schema only as literal; however, currently `schema_of_json` allows JSON input as non-literal expressions (e.g, column). This was mistakenly allowed - we don't have to take other usages rather then the main purpose into account for now. This PR makes a followup to only allow literals for `schema_of_json`'s JSON input. We can allow non literal expressions later when it's needed or there are some usecase for it. ## How was this patch tested? Unit tests were added. Closes #22775 from HyukjinKwon/SPARK-25447-followup. Lead-authored-by: hyukjinkwon <gurwls...@apache.org> Co-authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33e337c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33e337c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33e337c1 Branch: refs/heads/master Commit: 33e337c1180a12edf1ae97f0221e389f23192461 Parents: 7d44bc2 Author: hyukjinkwon <gurwls...@apache.org> Authored: Fri Oct 26 22:14:43 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri Oct 26 22:14:43 2018 +0800 ---------------------------------------------------------------------- python/pyspark/sql/functions.py | 22 ++++++------ .../catalyst/expressions/jsonExpressions.scala | 21 +++++++++--- .../scala/org/apache/spark/sql/functions.scala | 24 +++++++++---- .../sql-tests/inputs/json-functions.sql | 6 +++- .../sql-tests/results/json-functions.sql.out | 36 +++++++++++++++++++- .../apache/spark/sql/JsonFunctionsSuite.scala | 2 +- 6 files changed, 87 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 739496b..ca2a256 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2335,30 +2335,32 @@ def to_json(col, options={}): @ignore_unicode_prefix @since(2.4) -def schema_of_json(col, options={}): +def schema_of_json(json, options={}): """ - Parses a column containing a JSON string and infers its schema in DDL format. + Parses a JSON string and infers its schema in DDL format. - :param col: string column in json format + :param json: a JSON string or a string literal containing a JSON string. :param options: options to control parsing. accepts the same options as the JSON datasource .. versionchanged:: 3.0 It accepts `options` parameter to control schema inferring. - >>> from pyspark.sql.types import * - >>> data = [(1, '{"a": 1}')] - >>> df = spark.createDataFrame(data, ("key", "value")) - >>> df.select(schema_of_json(df.value).alias("json")).collect() - [Row(json=u'struct<a:bigint>')] + >>> df = spark.range(1) >>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect() [Row(json=u'struct<a:bigint>')] - >>> schema = schema_of_json(lit('{a: 1}'), {'allowUnquotedFieldNames':'true'}) + >>> schema = schema_of_json('{a: 1}', {'allowUnquotedFieldNames':'true'}) >>> df.select(schema.alias("json")).collect() [Row(json=u'struct<a:bigint>')] """ + if isinstance(json, basestring): + col = _create_column_from_literal(json) + elif isinstance(json, Column): + col = _to_java_column(json) + else: + raise TypeError("schema argument should be a column or string") sc = SparkContext._active_spark_context - jc = sc._jvm.functions.schema_of_json(_to_java_column(col), options) + jc = sc._jvm.functions.schema_of_json(col, options) return Column(jc) http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index e966924..77af590 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -742,7 +742,7 @@ case class StructsToJson( case class SchemaOfJson( child: Expression, options: Map[String, String]) - extends UnaryExpression with String2StringExpression with CodegenFallback { + extends UnaryExpression with CodegenFallback { def this(child: Expression) = this(child, Map.empty[String, String]) @@ -750,6 +750,10 @@ case class SchemaOfJson( child = child, options = ExprUtils.convertToMapData(options)) + override def dataType: DataType = StringType + + override def nullable: Boolean = false + @transient private lazy val jsonOptions = new JSONOptions(options, "UTC") @@ -760,8 +764,17 @@ case class SchemaOfJson( factory } - override def convert(v: UTF8String): UTF8String = { - val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser => + @transient + private lazy val json = child.eval().asInstanceOf[UTF8String] + + override def checkInputDataTypes(): TypeCheckResult = child match { + case Literal(s, StringType) if s != null => super.checkInputDataTypes() + case _ => TypeCheckResult.TypeCheckFailure( + s"The input json should be a string literal and not null; however, got ${child.sql}.") + } + + override def eval(v: InternalRow): Any = { + val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => parser.nextToken() inferField(parser, jsonOptions) } @@ -776,7 +789,7 @@ object JsonExprUtils { def evalSchemaExpr(exp: Expression): DataType = exp match { case Literal(s, StringType) => DataType.fromDDL(s.toString) case e @ SchemaOfJson(_: Literal, _) => - val ddlSchema = e.eval().asInstanceOf[UTF8String] + val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String] DataType.fromDDL(ddlSchema.toString) case e => throw new AnalysisException( "Schema should be specified in DDL format as a string literal" + http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 2748e64..757a322 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3626,19 +3626,29 @@ object functions { } /** - * Parses a column containing a JSON string and infers its schema. + * Parses a JSON string and infers its schema in DDL format. * - * @param e a string column containing JSON data. + * @param json a JSON string. * * @group collection_funcs * @since 2.4.0 */ - def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr)) + def schema_of_json(json: String): Column = schema_of_json(lit(json)) /** - * Parses a column containing a JSON string and infers its schema using options. + * Parses a JSON string and infers its schema in DDL format. * - * @param e a string column containing JSON data. + * @param json a string literal containing a JSON string. + * + * @group collection_funcs + * @since 2.4.0 + */ + def schema_of_json(json: Column): Column = withExpr(new SchemaOfJson(json.expr)) + + /** + * Parses a JSON string and infers its schema in DDL format using options. + * + * @param json a string column containing JSON data. * @param options options to control how the json is parsed. accepts the same options and the * json data source. See [[DataFrameReader#json]]. * @return a column with string literal containing schema in DDL format. @@ -3646,8 +3656,8 @@ object functions { * @group collection_funcs * @since 3.0.0 */ - def schema_of_json(e: Column, options: java.util.Map[String, String]): Column = { - withExpr(SchemaOfJson(e.expr, options.asScala.toMap)) + def schema_of_json(json: Column, options: java.util.Map[String, String]): Column = { + withExpr(SchemaOfJson(json.expr, options.asScala.toMap)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 8bfd7c0..6c14eee 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -55,4 +55,8 @@ select to_json(array(array(1, 2, 3), array(4))); -- infer schema of json literal using options select schema_of_json('{"c1":1}', map('primitivesAsString', 'true')); select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 'true', 'prefersDecimal', 'true')); - +select schema_of_json(null); +CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 1, "b": 2}', 'a'); +SELECT schema_of_json(jsonField) FROM jsonTable; +-- Clean up +DROP VIEW IF EXISTS jsonTable; http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index c70a81e..ca0cd90 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 38 +-- Number of queries: 42 -- !query 0 @@ -318,3 +318,37 @@ select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 'tr struct<schema_of_json({"c1":01, "c2":0.1}):string> -- !query 37 output struct<c1:bigint,c2:decimal(1,1)> + + +-- !query 38 +select schema_of_json(null) +-- !query 38 schema +struct<> +-- !query 38 output +org.apache.spark.sql.AnalysisException +cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input json should be a string literal and not null; however, got NULL.; line 1 pos 7 + + +-- !query 39 +CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 1, "b": 2}', 'a') +-- !query 39 schema +struct<> +-- !query 39 output + + + +-- !query 40 +SELECT schema_of_json(jsonField) FROM jsonTable +-- !query 40 schema +struct<> +-- !query 40 output +org.apache.spark.sql.AnalysisException +cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type mismatch: The input json should be a string literal and not null; however, got jsontable.`jsonField`.; line 1 pos 7 + + +-- !query 41 +DROP VIEW IF EXISTS jsonTable +-- !query 41 schema +struct<> +-- !query 41 output + http://git-wip-us.apache.org/repos/asf/spark/blob/33e337c1/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 797b274..2b09782 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -395,7 +395,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { test("SPARK-24709: infers schemas of json strings and pass them to from_json") { val in = Seq("""{"a": [1, 2, 3]}""").toDS() - val out = in.select(from_json('value, schema_of_json(lit("""{"a": [1]}"""))) as "parsed") + val out = in.select(from_json('value, schema_of_json("""{"a": [1]}""")) as "parsed") val expected = StructType(StructField( "parsed", StructType(StructField( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org