[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Supports RDD of strings as input ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19339 @HyukjinKwon @viirya Thanks for your reviewing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19339: [SPARK-22112][PYSPARK] Supports RDD of strings as...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/19339#discussion_r141232322 --- Diff: python/pyspark/sql/readwriter.py --- @@ -420,7 +425,29 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine) if isinstance(path, basestring): path = [path] -return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) +if type(path) == list: +return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) +elif isinstance(path, RDD): +def func(iterator): +for x in iterator: +if not isinstance(x, basestring): +x = unicode(x) +if isinstance(x, unicode): +x = x.encode("utf-8") +yield x +keyed = path.mapPartitions(func) +keyed._bypass_serializer = True +jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString()) +# see SPARK-22112 +# There aren't any jvm api for creating a dataframe from rdd storing csv. --- End diff -- Ok thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Supports RDD of strings as input ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19339 @HyukjinKwon I has updated this title. Thanks ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Add an API to create a DataFrame ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19339 umm.. I test it fine using Python 3.4 in my local. I'm not sure why did it test fail sometime... :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19339: [SPARK-22112][PYSPARK] Add an API to create a Dat...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/19339#discussion_r141082457 --- Diff: python/pyspark/sql/readwriter.py --- @@ -336,6 +336,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non ``inferSchema`` option or specify the schema explicitly using ``schema``. :param path: string, or list of strings, for input path(s). --- End diff -- ok thanks :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19339: [SPARK-22112][PYSPARK] Add an API to create a Dat...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/19339#discussion_r141080522 --- Diff: python/pyspark/sql/readwriter.py --- @@ -420,7 +425,29 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine) if isinstance(path, basestring): path = [path] -return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) +if type(path) == list: +return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) +elif isinstance(path, RDD): +def func(iterator): +for x in iterator: +if not isinstance(x, basestring): +x = unicode(x) +if isinstance(x, unicode): +x = x.encode("utf-8") +yield x +keyed = path.mapPartitions(func) +keyed._bypass_serializer = True +jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString()) +# [SPARK-22112] +# There aren't any jvm api for creating a dataframe from rdd storing csv. --- End diff -- ok let me fix it. thanks :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19339: [SPARK-22112][PYSPARK] Add an API to create a Dat...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/19339#discussion_r141075069 --- Diff: python/pyspark/sql/readwriter.py --- @@ -420,7 +425,25 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine) if isinstance(path, basestring): path = [path] -return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) +if type(path) == list: +return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) +elif isinstance(path, RDD): +def func(iterator): +for x in iterator: +if not isinstance(x, basestring): +x = unicode(x) +if isinstance(x, unicode): +x = x.encode("utf-8") +yield x +keyed = path.mapPartitions(func) +keyed._bypass_serializer = True +jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString()) +jdataset = self._spark._ssql_ctx.createDataset( +jrdd.rdd(), +self._spark._sc._jvm.Encoders.STRING()) --- End diff -- yes, it's work. I'll modify it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Add an API to create a DataFrame ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19339 This is so weird. I run it fine using Python 3.5.2 but it seems to have some problem using Python 3.4. Let me try Python 3.4 in my local. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Add an API to create a DataFrame ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19339 ok, so maybe I create another JIRA for this issue? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Add an API to create a DataFrame ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19339 @HyukjinKwon I think your way works fine after fixing a variable name bug (`_jsqlContext` >> `_jssql_ctx`). Should we need to modify the json part to be consistent with the csv part? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19339: [SPARK-22112][PYSPARK] Add an API to create a Dat...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/19339#discussion_r141033740 --- Diff: python/pyspark/sql/readwriter.py --- @@ -420,7 +425,22 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine) if isinstance(path, basestring): path = [path] -return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) +if type(path) == list: +return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) +elif isinstance(path, RDD): +def func(iterator): +for x in iterator: +if not isinstance(x, basestring): +x = unicode(x) +if isinstance(x, unicode): +x = x.encode("utf-8") +yield x +keyed = path.mapPartitions(func) +keyed._bypass_serializer = True +jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString()) --- End diff -- ok, This way is looked good. I'll try it. Thanks for your suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19339: [SPARK-22112][PYSPARK] Add an API to create a Dat...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/19339#discussion_r140779203 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -456,6 +456,40 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } /** + * Loads a `JavaRDD[String]` storing storing CSV rows and returns the result as a `DataFrame`. + * + * If the schema is not specified using `schema` function and `inferSchema` option is enabled, + * this function goes through the input once to determine the input schema. + * + * If the schema is not specified using `schema` function and `inferSchema` option is disabled, + * it determines the columns as string types and it reads only the first line to determine the + * names and the number of fields. + * + * @param csvRDD input RDD with one CSV row per record + * @since 2.2.0 + */ + @deprecated("Use csv(Dataset[String]) instead.", "2.2.0") + def csv(csvRDD: JavaRDD[String]): DataFrame = csv(csvRDD.rdd) + + /** + * Loads a `RDD[String]` storing storing CSV rows and returns the result as a `DataFrame`. + * + * If the schema is not specified using `schema` function and `inferSchema` option is enabled, + * this function goes through the input once to determine the input schema. + * + * If the schema is not specified using `schema` function and `inferSchema` option is disabled, + * it determines the columns as string types and it reads only the first line to determine the + * names and the number of fields. + * + * @param csvRDD input RDD with one CSV row per record + * @since 2.2.0 + */ + @deprecated("Use csv(Dataset[String]) instead.", "2.2.0") + def csv(csvRDD: RDD[String]): DataFrame = { --- End diff -- Thanks for your reviewing :) umm.. I followed `spark.read.json`'s way to add them. Although `json(jsonRDD :RDD[String]` has been deprecated, PySpark still use it to create a `DataFrame`. I think adding a private wrapper in Scala maybe better because not only PySpark but SparkR maybe need it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19339: [SPARK-22112][PYSPARK] Add an API to create a DataFrame ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19339 @HyukjinKwon @viirya Could you review this PR? Thanks! :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19339: [SPARK-22112][PYSPARK] Add an API to create a Dat...
GitHub user goldmedal opened a pull request: https://github.com/apache/spark/pull/19339 [SPARK-22112][PYSPARK] Add an API to create a DataFrame from RDD[String] storing CSV ## What changes were proposed in this pull request? We added a method to the scala API for creating a `DataFrame` from `DataSet[String]` storing CSV in [SPARK-15463](https://issues.apache.org/jira/browse/SPARK-15463) but PySpark doesn't have `Dataset` to support this feature. Therfore, I add an API to create a `DataFrame` from `RDD[String]` storing csv and it's also consistent with PySpark's `spark.read.json`. For example as below ``` >>> rdd = sc.textFile('python/test_support/sql/ages.csv') >>> df2 = spark.read.csv(rdd) >>> df2.dtypes [('_c0', 'string'), ('_c1', 'string')] ``` ## How was this patch tested? add unit test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/goldmedal/spark SPARK-22112 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19339.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19339 commit d557892080c8d6ec33dd7a13f4b8cdad88b440b0 Author: goldmedal <liugs...@gmail.com> Date: 2017-09-25T09:31:36Z add csv from `RDD[String]` API and related test case commit baaa93f5e837cdba02922e183a3f81c287e19854 Author: goldmedal <liugs...@gmail.com> Date: 2017-09-25T09:50:34Z fix test case commit d4ef30abdda142a969400c9e6e11a089a5483385 Author: goldmedal <liugs...@gmail.com> Date: 2017-09-25T11:59:08Z finish pyspark dataframe from rdd of csv string commit 9bd4eed474fdfa20d5933558d519fb187694aa33 Author: goldmedal <liugs...@gmail.com> Date: 2017-09-25T12:13:50Z modified comments commit 7525b48d2b9b59b1d6ce74a145fc049cfce6529a Author: goldmedal <liugs...@gmail.com> Date: 2017-09-25T12:14:55Z modified comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19223: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support c...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19223 Thanks @HyukjinKwon @felixcheung @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19223: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support c...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19223 ok. I got it. Thanks :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19223: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support c...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19223 @HyukjinKwon Thanks for triggering AppVeyor. In normal case, will AppVeyor be triggered automatically? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19223: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json su...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/19223#discussion_r138856179 --- Diff: python/pyspark/sql/functions.py --- @@ -1921,10 +1921,12 @@ def from_json(col, schema, options={}): @since(2.1) def to_json(col, options={}): """ -Converts a column containing a [[StructType]] or [[ArrayType]] of [[StructType]]s into a -JSON string. Throws an exception, in the case of an unsupported type. +Converts a column containing a :class:`StructType`, :class:`ArrayType` of :class:`StructType`s, --- End diff -- ok, I'll modified it. Because I'm not really familiar with python, thanks for your suggestions. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19223: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support c...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/19223 @HyukjinKwon @felixcheung @viirya I has finished those change at your suggestions for this PR and it also passed all tests. Please take a look when you are available. Thanks :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19223: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json su...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/19223#discussion_r138800321 --- Diff: python/pyspark/sql/functions.py --- @@ -1921,10 +1921,12 @@ def from_json(col, schema, options={}): @since(2.1) def to_json(col, options={}): """ -Converts a column containing a [[StructType]] or [[ArrayType]] of [[StructType]]s into a -JSON string. Throws an exception, in the case of an unsupported type. +Converts a column containing a [[StructType]], [[ArrayType]] of [[StructType]]s, +a [[MapType]] or [[ArrayType]] of [[MapType]] into a JSON string. +Throws an exception, in the case of an unsupported type. --- End diff -- ok Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19223: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json su...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/19223#discussion_r138799591 --- Diff: R/pkg/R/functions.R --- @@ -1715,7 +1717,15 @@ setMethod("to_date", #' #' # Converts an array of structs into a JSON array #' df2 <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people") -#' df2 <- mutate(df2, people_json = to_json(df2$people))} +#' df2 <- mutate(df2, people_json = to_json(df2$people)) +#' +#' # Converts a map into a JSON object +#' df2 <- sql("SELECT map('name', 'Bob')) as people") +#' df2 <- mutate(df2, people_json = to_json(df2$people)) +#' +#' # Converts an array of maps into a JSON array +#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people") +#' df2 <- mutate(df2, people_json = to_json(df2$people)) --- End diff -- ok Thanks for careful review :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19223: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json su...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/19223#discussion_r138799483 --- Diff: sql/core/src/test/resources/sql-tests/results/json-functions.sql.out --- @@ -26,13 +26,13 @@ Extended Usage: {"time":"26/08/2015"} > SELECT to_json(array(named_struct('a', 1, 'b', 2)); [{"a":1,"b":2}] - > SELECT to_json(map('a',named_struct('b',1))); + > SELECT to_json(map('a', named_struct('b', 1))); --- End diff -- umm. I modified `ExpressionDescription` of `StructsToJson` at @HyukjinKwon 's suggestions which didn't be merged in last PR. Here's the test for `describe function extended to_json`, so I needed to regenerate the golden file for it. So this change isn't from `json-functions.sql`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19223: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json su...
GitHub user goldmedal opened a pull request: https://github.com/apache/spark/pull/19223 [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support converting MapType to json for PySpark and SparkR ## What changes were proposed in this pull request? In previous work SPARK-21513, we has allowed `MapType` and `ArrayType` of `MapType`s convert to a json string but only for Scala API. In this follow-up PR, we will make SparkSQL support it for PySpark and SparkR, too. We also fix some little bugs and comments of the previous work in this follow-up PR. ### For PySpark ``` >>> data = [(1, {"name": "Alice"})] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() [Row(json=u'{"name":"Alice")'] >>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() [Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')] ``` ### For SparkR ``` # Converts a map into a JSON object df2 <- sql("SELECT map('name', 'Bob')) as people") df2 <- mutate(df2, people_json = to_json(df2$people)) # Converts an array of maps into a JSON array df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people") df2 <- mutate(df2, people_json = to_json(df2$people)) ``` ## How was this patch tested? Add unit test cases. cc @viirya @HyukjinKwon You can merge this pull request into a Git repository by running: $ git pull https://github.com/goldmedal/spark SPARK-21513-fp-PySaprkAndSparkR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19223.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19223 commit 071173c2a30486e6e462e85c9e25b04db9f1d8d6 Author: goldmedal <liugs...@gmail.com> Date: 2017-09-13T18:41:13Z fix the coding style issue commit 80fbb6b589c7ecadb1016ede701d363035793eae Author: goldmedal <liugs...@gmail.com> Date: 2017-09-13T18:41:54Z fix the logic operator using commit 5e9a7266002918babedaf426c68b9e2b93e7b967 Author: goldmedal <liugs...@gmail.com> Date: 2017-09-13T18:48:40Z add comments and test cases for sparkR commit 6a3d374cac58e51b3c687b30e4bd924694c0ff91 Author: goldmedal <liugs...@gmail.com> Date: 2017-09-13T18:52:08Z add comments and test cases for PySpark commit 1f5b7cf86c19b6570dc64e4c8f12a215068dcc7f Author: goldmedal <liugs...@gmail.com> Date: 2017-09-13T18:55:30Z fix some bug and comments commit 29e7323467c319d8e83a086d20f8bffde34a7b15 Author: goldmedal <liugs...@gmail.com> Date: 2017-09-13T19:06:51Z re-generate golden file --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/18875 @HyukjinKwon ok. I got it. Thanks =) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/18875 @HyukjinKwon OK, I'll work on R and Python. My JIRA id is 'goldmedal', too. Thanks for your review :) @viirya Thanks for your mentor and review :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138501335 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -26,20 +26,50 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ +/** + * `JackGenerator` can only be initialized with a `StructType` or a `MapType`. + * Once it is initialized with `StructType`, it can be used to write out a struct or an array of + * struct. Once it is initialized with `MapType`, it can be used to write out a map or an array + * of map. An exception will be thrown if trying to write out a struct if it is initialized with + * a `MapType`, and vice verse. + */ private[sql] class JacksonGenerator( -schema: StructType, +dataType: DataType, writer: Writer, options: JSONOptions) { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. private type ValueWriter = (SpecializedGetters, Int) => Unit + // `JackGenerator` can only be initialized with a `StructType` or a `MapType`. + require(dataType.isInstanceOf[StructType] | dataType.isInstanceOf[MapType], --- End diff -- oh. Yes, you're right. This is my mistake. :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/18875 @HyukjinKwon @viirya Sorry for updating this PR so late. Please take a look when you are available. Thanks :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138237482 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +223,35 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = { +writeObject(writeFields( + fieldWriters = rootFieldWriters, + row = row, + schema = dataType.asInstanceOf[StructType])) + } + + + /** + * Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson + * + * @param array The array of rows or maps to convert + */ + def write(array: ArrayData): Unit = writeArray(writeArrayData( +fieldWriter = arrElementWriter, +array = array + )) /** - * Transforms multiple `InternalRow`s to JSON array using Jackson + * Transforms a single `MapData` to JSON object using Jackson * - * @param array The array of rows to convert + * @param map a map to convert */ - def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + def write(map: MapData): Unit = { +writeObject(writeMapData( + fieldWriter = mapElementWriter, --- End diff -- Ok Thanks for review :) I'll update it tonight. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138110523 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.CharArrayWriter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.types._ + +class JacksonGeneratorSuite extends SparkFunSuite { + + val gmtId = DateTimeUtils.TimeZoneGMT.getID + val option = new JSONOptions(Map.empty, gmtId) + + test("initial with StructType and write out a row") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = InternalRow(1) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """{"a":1}""") + } + + test("initial with StructType and write out rows") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{"a":1},{"a":2}]""") + } + + test("initial with StructType and write out an array with single empty row") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(InternalRow(null) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{}]""") + } + + test("initial with StructType and write out an empty array") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[]""") + } + + test("initial with Map and write out a map data") { +val dataType = MapType(StringType, IntegerType) +val input = ArrayBasedMapData(Map("a" -> 1)) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """{"a":1}""") + } + + test("initial with Map and write out an array of maps") { +val dataType = MapType(StringType, IntegerType) +val input = new GenericArrayData( + ArrayBasedMapData(Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{"a":1},{"b":2}]""") + } + + test("error handling: initial with StructType but error calling write a map") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = ArrayBasedMapData(Map("a" -> 1)) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType,
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138094337 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.CharArrayWriter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.types._ + +class JacksonGeneratorSuite extends SparkFunSuite { + + val gmtId = DateTimeUtils.TimeZoneGMT.getID + val option = new JSONOptions(Map.empty, gmtId) + + test("initial with StructType and write out a row") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = InternalRow(1) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """{"a":1}""") + } + + test("initial with StructType and write out rows") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{"a":1},{"a":2}]""") + } + + test("initial with StructType and write out an array with single empty row") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(InternalRow(null) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{}]""") + } + + test("initial with StructType and write out an empty array") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[]""") + } + + test("initial with Map and write out a map data") { +val dataType = MapType(StringType, IntegerType) +val input = ArrayBasedMapData(Map("a" -> 1)) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """{"a":1}""") + } + + test("initial with Map and write out an array of maps") { +val dataType = MapType(StringType, IntegerType) +val input = new GenericArrayData( + ArrayBasedMapData(Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{"a":1},{"b":2}]""") + } + + test("error handling: initial with StructType but error calling write a map") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = ArrayBasedMapData(Map("a" -> 1)) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType,
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138087980 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.CharArrayWriter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.types._ + +class JacksonGeneratorSuite extends SparkFunSuite { + + val gmtId = DateTimeUtils.TimeZoneGMT.getID + val option = new JSONOptions(Map.empty, gmtId) + + test("initial with StructType and write out a row") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = InternalRow(1) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """{"a":1}""") + } + + test("initial with StructType and write out rows") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{"a":1},{"a":2}]""") + } + + test("initial with StructType and write out an array with single empty row") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(InternalRow(null) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{}]""") + } + + test("initial with StructType and write out an empty array") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[]""") + } + + test("initial with Map and write out a map data") { +val dataType = MapType(StringType, IntegerType) +val input = ArrayBasedMapData(Map("a" -> 1)) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """{"a":1}""") + } + + test("initial with Map and write out an array of maps") { +val dataType = MapType(StringType, IntegerType) +val input = new GenericArrayData( + ArrayBasedMapData(Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{"a":1},{"b":2}]""") + } + + test("error handling: initial with StructType but error calling write a map") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = ArrayBasedMapData(Map("a" -> 1)) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType,
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/18875 @HyukjinKwon We have finished the `MapType` and `ArrayType` of `MapType`s supporting. Please take a look when you are available. Thanks :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137987843 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +228,32 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = dataType match { +case st: StructType => + writeObject(writeFields(row, st, rootFieldWriters)) +case _ => throw new UnsupportedOperationException( --- End diff -- It will throw `ClassCastException`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137987296 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +228,32 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = dataType match { +case st: StructType => + writeObject(writeFields(row, st, rootFieldWriters)) +case _ => throw new UnsupportedOperationException( --- End diff -- oh I got you wrong. I thought you mean the matching in `rootFieldWeriters`. So we need to keep both of them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137984767 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -677,14 +696,42 @@ case class StructsToJson( override def checkInputDataTypes(): TypeCheckResult = child.dataType match { case _: StructType | ArrayType(_: StructType, _) => try { -JacksonUtils.verifySchema(rowSchema) +JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType]) +TypeCheckResult.TypeCheckSuccess + } catch { +case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } +case ArrayType(mt: MapType, _) => + try { +val st = StructType(StructField("a", mt) :: Nil) +JacksonUtils.verifySchema(st) +TypeCheckResult.TypeCheckSuccess + } catch { +case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } +case MapType(_: DataType, st: StructType, _: Boolean) => --- End diff -- @viirya I think if we have `case mt: MapType`, we don't need this pattern to verify schema, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137980084 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -677,14 +696,27 @@ case class StructsToJson( override def checkInputDataTypes(): TypeCheckResult = child.dataType match { case _: StructType | ArrayType(_: StructType, _) => try { -JacksonUtils.verifySchema(rowSchema) +JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType]) +TypeCheckResult.TypeCheckSuccess + } catch { +case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } +case ArrayType(_: MapType, _) => + TypeCheckResult.TypeCheckSuccess +case MapType(_: DataType, st: StructType, _: Boolean) => + try { +JacksonUtils.verifySchema(st) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } +case _: MapType => --- End diff -- ok. I just leave a TODO comment for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137979014 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -677,14 +696,27 @@ case class StructsToJson( override def checkInputDataTypes(): TypeCheckResult = child.dataType match { case _: StructType | ArrayType(_: StructType, _) => try { -JacksonUtils.verifySchema(rowSchema) +JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType]) +TypeCheckResult.TypeCheckSuccess + } catch { +case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } +case ArrayType(_: MapType, _) => + TypeCheckResult.TypeCheckSuccess +case MapType(_: DataType, st: StructType, _: Boolean) => + try { +JacksonUtils.verifySchema(st) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } +case _: MapType => --- End diff -- oh yes. It's a tricky way but make sense. =D --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137978728 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +228,32 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = dataType match { +case st: StructType => + writeObject(writeFields(row, st, rootFieldWriters)) +case _ => throw new UnsupportedOperationException( --- End diff -- ok got it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137978096 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -677,14 +696,27 @@ case class StructsToJson( override def checkInputDataTypes(): TypeCheckResult = child.dataType match { case _: StructType | ArrayType(_: StructType, _) => try { -JacksonUtils.verifySchema(rowSchema) +JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType]) +TypeCheckResult.TypeCheckSuccess + } catch { +case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } +case ArrayType(_: MapType, _) => + TypeCheckResult.TypeCheckSuccess +case MapType(_: DataType, st: StructType, _: Boolean) => + try { +JacksonUtils.verifySchema(st) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } +case _: MapType => --- End diff -- @viirya But the `JacksonUtils.verifySchema` only verify a `StructType`, so we need to add a new one? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137977643 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.CharArrayWriter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class JacksonGeneratorSuite extends SparkFunSuite { + + val gmtId = DateTimeUtils.TimeZoneGMT.getID + val option = new JSONOptions(Map.empty, gmtId) + val writer = new CharArrayWriter() + def getAndReset(gen: JacksonGenerator): UTF8String = { +gen.flush() +val json = writer.toString +writer.reset() +UTF8String.fromString(json) + } + + test("initial with StructType and write out a row") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = InternalRow(1) +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +assert(getAndReset(gen) === UTF8String.fromString("""{"a":1}""")) --- End diff -- @viirya I think we also need to do `gen.flush()`, right? So maybe we can keep `getAndReset` and modify it as below. Will it be better? ``` def getAndReset(gen: JacksonGenerator, writer: Writer): String = { gen.flush() writer.toString } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137977153 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.CharArrayWriter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class JacksonGeneratorSuite extends SparkFunSuite { + + val gmtId = DateTimeUtils.TimeZoneGMT.getID + val option = new JSONOptions(Map.empty, gmtId) + val writer = new CharArrayWriter() + def getAndReset(gen: JacksonGenerator): UTF8String = { +gen.flush() +val json = writer.toString +writer.reset() +UTF8String.fromString(json) + } + + test("initial with StructType and write out a row") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = InternalRow(1) +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +assert(getAndReset(gen) === UTF8String.fromString("""{"a":1}""")) --- End diff -- Yes, maybe we can just `writer.toString` and assert it with the normal string. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137977056 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.CharArrayWriter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class JacksonGeneratorSuite extends SparkFunSuite { + + val gmtId = DateTimeUtils.TimeZoneGMT.getID + val option = new JSONOptions(Map.empty, gmtId) + val writer = new CharArrayWriter() + def getAndReset(gen: JacksonGenerator): UTF8String = { +gen.flush() +val json = writer.toString +writer.reset() --- End diff -- ok. I got it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137976953 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +228,32 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = dataType match { +case st: StructType => + writeObject(writeFields(row, st, rootFieldWriters)) +case _ => throw new UnsupportedOperationException( --- End diff -- @viirya I found if we don't check it in here, it will throw `ClassCastExceptoin` from `writeObject()` firstly. So, maybe we don't need to check type in `rootFieldWriters` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137975716 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +228,32 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = dataType match { +case st: StructType => + writeObject(writeFields(row, st, rootFieldWriters)) +case _ => throw new UnsupportedOperationException( --- End diff -- ok. I'll remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137959771 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +228,58 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = dataType match { +case st: StructType => + writeObject(writeFields(row, st, rootFieldWriters)) +case _ => throw new UnsupportedOperationException( + s"`JacksonGenerator` can only be used to write out a row when initialized with `StructType`.") + } /** - * Transforms multiple `InternalRow`s to JSON array using Jackson + * Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson * - * @param array The array of rows to convert + * @param array The array of rows or maps to convert */ - def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + def write(array: ArrayData): Unit = dataType match { +case st: StructType => + try { --- End diff -- I think you're right. If the caller has an error calling, it will also throw a `ClassCastException` to remind the caller. So we're unnecessary to check it. I'll remove it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137958902 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +228,58 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = dataType match { +case st: StructType => + writeObject(writeFields(row, st, rootFieldWriters)) +case _ => throw new UnsupportedOperationException( + s"`JacksonGenerator` can only be used to write out a row when initialized with `StructType`.") + } /** - * Transforms multiple `InternalRow`s to JSON array using Jackson + * Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson * - * @param array The array of rows to convert + * @param array The array of rows or maps to convert */ - def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + def write(array: ArrayData): Unit = dataType match { +case st: StructType => + try { +if (array.numElements() > 0) { + array.getStruct(0, st.length) +} + } catch { +case cce: ClassCastException => + throw new UnsupportedOperationException( +s"`JacksonGenerator` can only be used to write out an array of struct " + + s"when initialized with `StructType`") + } + writeArray(writeArrayData(array, arrElementWriter)) +case _: MapType => + try { +if (array.numElements() > 0) { + array.getMap(0) +} + } catch { +case cce: ClassCastException => + throw new UnsupportedOperationException( +s"`JacksonGenerator` can only be used to write out an array of map when initialized" + +s"with `MapType`") + } + writeArray(writeArrayData(array, arrElementWriter)) +case _ => throw new UnsupportedOperationException( --- End diff -- Yes, you're right. I'll remove this pattern case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137710147 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -26,20 +26,50 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ +// `JackGenerator` can only be initialized with a `StructType` or a `MapType`. +// Once it is initialized with `StructType`, it can be used to write out a struct or an array of +// struct. Once it is initialized with `MapType`, it can be used to write out a map. An exception +// will be thrown if trying to write out a struct if it is initialized with a `MapType`, +// and vice verse. --- End diff -- ok. I'll modify it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137288735 --- Diff: sql/core/src/test/resources/sql-tests/results/cross-join.sql.out --- @@ -128,6 +128,7 @@ two 2 two 2 one 1 two 2 two2 two 2 three 3 two 2 two2 two 2 two 2 two 2 + --- End diff -- ummm. That's so weird. I'm not sure about it but maybe this while line was be generated when I regenerated the golden file. I'll recover it. =P --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137285989 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -669,14 +679,25 @@ case class StructsToJson( override def checkInputDataTypes(): TypeCheckResult = child.dataType match { case _: StructType | ArrayType(_: StructType, _) => try { -JacksonUtils.verifySchema(rowSchema) +JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType]) +TypeCheckResult.TypeCheckSuccess + } catch { +case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } +case MapType(_: DataType, st: StructType, _: Boolean) => + try { +JacksonUtils.verifySchema(st) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } +case _: MapType => + TypeCheckResult.TypeCheckSuccess --- End diff -- ok. thanks for review =) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137282128 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -22,24 +22,49 @@ import java.io.Writer import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ private[sql] class JacksonGenerator( -schema: StructType, +dataType: DataType, writer: Writer, options: JSONOptions) { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. private type ValueWriter = (SpecializedGetters, Int) => Unit + // `JackGenerator` only supports to write out a struct, an array of struct or an arbitrary map --- End diff -- ok. I'll change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137273266 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -22,24 +22,50 @@ import java.io.Writer import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ private[sql] class JacksonGenerator( -schema: StructType, +dataType: DataType, writer: Writer, options: JSONOptions) { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. private type ValueWriter = (SpecializedGetters, Int) => Unit + // `JackGenerator` only supports to write out a struct, an array of struct or an arbitrary map + dataType match { +case _: StructType | _: MapType => + TypeCheckResult.TypeCheckSuccess --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137269586 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -127,7 +153,7 @@ private[sql] class JacksonGenerator( (row: SpecializedGetters, ordinal: Int) => val v = row.get(ordinal, dataType) sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " + - s"with the type of $dataType to JSON.") +s"with the type of $dataType to JSON.") --- End diff -- ummm.. I think it maybe modified by IDE automatically. I'll recover it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137048876 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -27,21 +27,45 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ private[sql] class JacksonGenerator( -schema: StructType, +childType: DataType, +rowSchema: StructType, --- End diff -- thanks for review =). I will follow your suggestion to change it. But I think `JacksonGenerator` only support write out an arbitrary map, it doesn't support to write out an array of map yet. Should I need to fix it? I think that maybe an issue for supporting arbitrary array? Should I need to do some check for API `write(row: InternalRow)` calling? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r137044817 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -27,21 +27,45 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ private[sql] class JacksonGenerator( -schema: StructType, +childType: DataType, --- End diff -- ok. I recover it and use `rowSchema` to take what `MapType` needs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/18875 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/18875 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r134469528 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -202,5 +203,9 @@ private[sql] class JacksonGenerator( */ def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + def write(map: MapData, mapType: MapType): Unit = { --- End diff -- sorry to reply too late. I thought that others `write` function don't take any type because `JacksonGenerator` has a `schema` member. Because `writeMapData` needs `mapType`, I made `write` function of `MapType` take it. I try to fix this problem by making `childType` matching move to `JacksonGenerator` in new commits a254f89 but I'm not sure if this way is better. @HyukjinKwon @viirya How do you thinks about this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r132615092 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -186,6 +186,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row("""[{"_1":1}]""") :: Nil) } + test("to_json - map") { +val df1 = Seq(Map("a" -> Tuple1(1))).toDF("a") +val df2 = Seq(Map(Tuple1(1) -> Tuple1(1))).toDF("a") + +checkAnswer( + df1.select(to_json($"a")), + Row("""{"a":{"_1":1}}""") :: Nil) +checkAnswer( + df2.select(to_json($"a")), + Row("""{"[0,1]":{"_1":1}}""") :: Nil) --- End diff -- Actually, I'm not sure what answer is it but I got `[0,1]` using ``` scala> Seq(Tuple1(Tuple1(Map(Tuple1(1)->Tuple1(1).toDF("a").select(to_json($"a")).show() ++ |structstojson(a)| ++ |[{"_1":{"[0,1]":{...| ++ ``` so I think this answer should be correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r131952135 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -659,13 +660,19 @@ case class StructsToJson( (arr: Any) => gen.write(arr.asInstanceOf[ArrayData]) getAndReset() + case MapType(_: DataType, _: StructType, _: Boolean) => +(map: Any) => + val mapType = child.dataType.asInstanceOf[MapType] + gen.write(map.asInstanceOf[MapData], mapType) + getAndReset() } } override def dataType: DataType = StringType override def checkInputDataTypes(): TypeCheckResult = child.dataType match { -case _: StructType | ArrayType(_: StructType, _) => +case _: StructType | ArrayType(_: StructType, _) | + MapType(_: DataType, _: StructType, _: Boolean) => try { JacksonUtils.verifySchema(rowSchema) TypeCheckResult.TypeCheckSuccess --- End diff -- @HyukjinKwon yeah, you're right. I'll add some message about maptype. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r131951764 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -202,5 +202,9 @@ private[sql] class JacksonGenerator( */ def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + def write(map: MapData, mapType: MapType): Unit = { +writeObject(writeMapData(map, mapType, makeWriter(mapType.valueType))) --- End diff -- @viirya ok, got it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r131946027 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -659,13 +660,19 @@ case class StructsToJson( (arr: Any) => gen.write(arr.asInstanceOf[ArrayData]) getAndReset() + case MapType(_: DataType, _: StructType, _: Boolean) => +(map: Any) => + val mapType = child.dataType.asInstanceOf[MapType] + gen.write(map.asInstanceOf[MapData], mapType) + getAndReset() } } override def dataType: DataType = StringType override def checkInputDataTypes(): TypeCheckResult = child.dataType match { -case _: StructType | ArrayType(_: StructType, _) => +case _: StructType | ArrayType(_: StructType, _) | + MapType(_: DataType, _: StructType, _: Boolean) => --- End diff -- yeah, just like @viirya said. This expression is named `StructsToJson`. Another reason is that it use `JacksonGenerator` which will need a `rowSchema` when initializing to generate JSON string. Therefore, it must be restricted to `StructType` here. or can we create another expression to support arbitrary map type? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/18875 @HyukjinKwon Thanks for the reference! I'll check it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
GitHub user goldmedal opened a pull request: https://github.com/apache/spark/pull/18875 [SPARK-21513][SQL] Allow UDF to_json support converting MapType to json # What changes were proposed in this pull request? UDF to_json only supports converting `StructType` or `ArrayType` of `StructType`s to a json output string now. According to the discussion of JIRA SPARK-21513, I allow to `to_json` support converting `MapType<DataType, StructType>` to a json output string. # How was this patch tested? Adding unit test case. cc @viirya @HyukjinKwon You can merge this pull request into a Git repository by running: $ git pull https://github.com/goldmedal/spark SPARK-21513 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18875.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18875 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18854: [SPARK-21629][SQL][WIP] Fix Or nullability
Github user goldmedal closed the pull request at: https://github.com/apache/spark/pull/18854 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18854: [SPARK-21629][SQL][WIP] Fix Or nullability
Github user goldmedal commented on the issue: https://github.com/apache/spark/pull/18854 @gatorsmile @viirya Thanks a lot. You are right. I close it for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18854: [SPARK-21629][SQL][WIP] Fix Or nullability
GitHub user goldmedal opened a pull request: https://github.com/apache/spark/pull/18854 [SPARK-21629][SQL][WIP] Fix Or nullability ## What changes were proposed in this pull request? Override nullable of Or Expression to make sure nullable behavior is correct. ## How was this patch tested? I'll add test case later cc @viirya You can merge this pull request into a Git repository by running: $ git pull https://github.com/goldmedal/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18854.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18854 commit be711d003f2831c1ab4b69fc2a119aefbba08743 Author: goldmedal <liugs...@gmail.com> Date: 2017-08-05T14:44:04Z Fix Or nullability --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org