Repository: spark Updated Branches: refs/heads/master 1e437835e -> 1007cae20
[SPARK-25447][SQL] Support JSON options by schema_of_json() ## What changes were proposed in this pull request? In the PR, I propose to extended the `schema_of_json()` function, and accept JSON options since they can impact on schema inferring. Purpose is to support the same options that `from_json` can use during schema inferring. ## How was this patch tested? Added SQL, Python and Scala tests (`JsonExpressionsSuite` and `JsonFunctionsSuite`) that checks JSON options are used. Closes #22442 from MaxGekk/schema_of_json-options. Authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1007cae2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1007cae2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1007cae2 Branch: refs/heads/master Commit: 1007cae20e8f566e7d7c25f0f81c9b84f352b6d5 Parents: 1e43783 Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Sat Sep 29 17:53:30 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Sat Sep 29 17:53:30 2018 +0800 ---------------------------------------------------------------------- python/pyspark/sql/functions.py | 11 ++++++-- .../catalyst/expressions/jsonExpressions.scala | 28 +++++++++++++++----- .../expressions/JsonExpressionsSuite.scala | 12 +++++++-- .../scala/org/apache/spark/sql/functions.scala | 15 +++++++++++ .../sql-tests/inputs/json-functions.sql | 4 +++ .../sql-tests/results/json-functions.sql.out | 18 ++++++++++++- .../apache/spark/sql/JsonFunctionsSuite.scala | 8 ++++++ 7 files changed, 85 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index e5bc1ea..74f0685 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2348,11 +2348,15 @@ def to_json(col, options={}): @ignore_unicode_prefix @since(2.4) -def schema_of_json(col): +def schema_of_json(col, options={}): """ Parses a column containing a JSON string and infers its schema in DDL format. :param col: string column in json format + :param options: options to control parsing. accepts the same options as the JSON datasource + + .. versionchanged:: 2.5 + It accepts `options` parameter to control schema inferring. >>> from pyspark.sql.types import * >>> data = [(1, '{"a": 1}')] @@ -2361,10 +2365,13 @@ def schema_of_json(col): [Row(json=u'struct<a:bigint>')] >>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect() [Row(json=u'struct<a:bigint>')] + >>> schema = schema_of_json(lit('{a: 1}'), {'allowUnquotedFieldNames':'true'}) + >>> df.select(schema.alias("json")).collect() + [Row(json=u'struct<a:bigint>')] """ sc = SparkContext._active_spark_context - jc = sc._jvm.functions.schema_of_json(_to_java_column(col)) + jc = sc._jvm.functions.schema_of_json(_to_java_column(col), options) return Column(jc) http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index bd9090a..f5297dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -740,15 +740,31 @@ case class StructsToJson( examples = """ Examples: > SELECT _FUNC_('[{"col":0}]'); - array<struct<col:int>> + array<struct<col:bigint>> + > SELECT _FUNC_('[{"col":01}]', map('allowNumericLeadingZeros', 'true')); + array<struct<col:bigint>> """, since = "2.4.0") -case class SchemaOfJson(child: Expression) +case class SchemaOfJson( + child: Expression, + options: Map[String, String]) extends UnaryExpression with String2StringExpression with CodegenFallback { - private val jsonOptions = new JSONOptions(Map.empty, "UTC") - private val jsonFactory = new JsonFactory() - jsonOptions.setJacksonOptions(jsonFactory) + def this(child: Expression) = this(child, Map.empty[String, String]) + + def this(child: Expression, options: Expression) = this( + child = child, + options = JsonExprUtils.convertToMapData(options)) + + @transient + private lazy val jsonOptions = new JSONOptions(options, "UTC") + + @transient + private lazy val jsonFactory = { + val factory = new JsonFactory() + jsonOptions.setJacksonOptions(factory) + factory + } override def convert(v: UTF8String): UTF8String = { val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser => @@ -764,7 +780,7 @@ object JsonExprUtils { def evalSchemaExpr(exp: Expression): DataType = exp match { case Literal(s, StringType) => DataType.fromDDL(s.toString) - case e @ SchemaOfJson(_: Literal) => + case e @ SchemaOfJson(_: Literal, _) => val ddlSchema = e.eval().asInstanceOf[UTF8String] DataType.fromDDL(ddlSchema.toString) case e => throw new AnalysisException( http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 0e9c8ab..34fdd0c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -707,9 +707,17 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with } test("SPARK-24709: infer schema of json strings") { - checkEvaluation(SchemaOfJson(Literal.create("""{"col":0}""")), "struct<col:bigint>") + checkEvaluation(new SchemaOfJson(Literal.create("""{"col":0}""")), + "struct<col:bigint>") checkEvaluation( - SchemaOfJson(Literal.create("""{"col0":["a"], "col1": {"col2": "b"}}""")), + new SchemaOfJson(Literal.create("""{"col0":["a"], "col1": {"col2": "b"}}""")), "struct<col0:array<string>,col1:struct<col2:string>>") } + + test("infer schema of JSON strings by using options") { + checkEvaluation( + new SchemaOfJson(Literal.create("""{"col":01}"""), + CreateMap(Seq(Literal.create("allowNumericLeadingZeros"), Literal.create("true")))), + "struct<col:bigint>") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 4c58e77..59a1fcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3612,6 +3612,21 @@ object functions { def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr)) /** + * Parses a column containing a JSON string and infers its schema using options. + * + * @param e a string column containing JSON data. + * @param options options to control how the json is parsed. accepts the same options and the + * json data source. See [[DataFrameReader#json]]. + * @return a column with string literal containing schema in DDL format. + * + * @group collection_funcs + * @since 2.5.0 + */ + def schema_of_json(e: Column, options: java.util.Map[String, String]): Column = { + withExpr(SchemaOfJson(e.expr, options.asScala.toMap)) + } + + /** * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or * a `MapType` into a JSON string with the specified schema. * Throws an exception, in the case of an unsupported type. http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 0f22c0e..bdd1fe4 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -56,3 +56,7 @@ select from_json('[{"a": 1}, 2]', 'array<map<string,int>>'); select to_json(array('1', '2', '3')); select to_json(array(array(1, 2, 3), array(4))); +-- infer schema of json literal using options +select schema_of_json('{"c1":1}', map('primitivesAsString', 'true')); +select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 'true', 'prefersDecimal', 'true')); + http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index e550b43..77e9000 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 40 +-- Number of queries: 42 -- !query 0 @@ -370,3 +370,19 @@ select to_json(array(array(1, 2, 3), array(4))) struct<structstojson(array(array(1, 2, 3), array(4))):string> -- !query 39 output [[1,2,3],[4]] + + +-- !query 40 +select schema_of_json('{"c1":1}', map('primitivesAsString', 'true')) +-- !query 40 schema +struct<schemaofjson({"c1":1}):string> +-- !query 40 output +struct<c1:string> + + +-- !query 41 +select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 'true', 'prefersDecimal', 'true')) +-- !query 41 schema +struct<schemaofjson({"c1":01, "c2":0.1}):string> +-- !query 41 output +struct<c1:bigint,c2:decimal(1,1)> http://git-wip-us.apache.org/repos/asf/spark/blob/1007cae2/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 853bc18..5cbf101 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import collection.JavaConverters._ + import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -402,6 +404,12 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { assert(out.schema == expected) } + test("infers schemas using options") { + val df = spark.range(1) + .select(schema_of_json(lit("{a:1}"), Map("allowUnquotedFieldNames" -> "true").asJava)) + checkAnswer(df, Seq(Row("struct<a:bigint>"))) + } + test("from_json - array of primitive types") { val df = Seq("[1, 2, 3]").toDF("a") val schema = new ArrayType(IntegerType, false) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org