Repository: spark Updated Branches: refs/heads/master 1a7abf3f4 -> 39399f40b
[SPARK-25638][SQL] Adding new function - to_csv() ## What changes were proposed in this pull request? New functions takes a struct and converts it to a CSV strings using passed CSV options. It accepts the same CSV options as CSV data source does. ## How was this patch tested? Added `CsvExpressionsSuite`, `CsvFunctionsSuite` as well as R, Python and SQL tests similar to tests for `to_json()` Closes #22626 from MaxGekk/to_csv. Lead-authored-by: Maxim Gekk <max.g...@gmail.com> Co-authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39399f40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39399f40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39399f40 Branch: refs/heads/master Commit: 39399f40b861f7d8e60d0e25d2f8801343477834 Parents: 1a7abf3 Author: Maxim Gekk <max.g...@gmail.com> Authored: Sun Nov 4 14:57:38 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Sun Nov 4 14:57:38 2018 +0800 ---------------------------------------------------------------------- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 31 +++++-- R/pkg/R/generics.R | 4 + R/pkg/tests/fulltests/test_sparkSQL.R | 5 ++ python/pyspark/sql/functions.py | 22 +++++ .../catalyst/analysis/FunctionRegistry.scala | 3 +- .../sql/catalyst/csv/UnivocityGenerator.scala | 93 ++++++++++++++++++++ .../catalyst/expressions/csvExpressions.scala | 67 ++++++++++++++ .../expressions/CsvExpressionsSuite.scala | 44 +++++++++ .../datasources/csv/CSVFileFormat.scala | 2 +- .../datasources/csv/UnivocityGenerator.scala | 90 ------------------- .../scala/org/apache/spark/sql/functions.scala | 26 ++++++ .../sql-tests/inputs/csv-functions.sql | 6 ++ .../sql-tests/results/csv-functions.sql.out | 36 +++++++- .../apache/spark/sql/CsvFunctionsSuite.scala | 14 ++- 15 files changed, 345 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/R/pkg/NAMESPACE ---------------------------------------------------------------------- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index f9f556e..9d4f05a 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -380,6 +380,7 @@ exportMethods("%<=>%", "tanh", "toDegrees", "toRadians", + "to_csv", "to_date", "to_json", "to_timestamp", http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/R/pkg/R/functions.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index d2ca1d6..9292363 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -187,6 +187,7 @@ NULL #' \itemize{ #' \item \code{to_json}: it is the column containing the struct, array of the structs, #' the map or array of maps. +#' \item \code{to_csv}: it is the column containing the struct. #' \item \code{from_json}: it is the column containing the JSON string. #' \item \code{from_csv}: it is the column containing the CSV string. #' } @@ -204,11 +205,11 @@ NULL #' also supported for the schema. #' \item \code{from_csv}: a DDL-formatted string #' } -#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains -#' additional named properties to control how it is converted, accepts the same -#' options as the JSON data source. Additionally \code{to_json} supports the "pretty" -#' option which enables pretty JSON generation. In \code{arrays_zip}, this contains -#' additional Columns of arrays to be merged. +#' @param ... additional argument(s). In \code{to_json}, \code{to_csv} and \code{from_json}, +#' this contains additional named properties to control how it is converted, accepts +#' the same options as the JSON/CSV data source. Additionally \code{to_json} supports +#' the "pretty" option which enables pretty JSON generation. In \code{arrays_zip}, +#' this contains additional Columns of arrays to be merged. #' @name column_collection_functions #' @rdname column_collection_functions #' @family collection functions @@ -1741,6 +1742,26 @@ setMethod("to_json", signature(x = "Column"), }) #' @details +#' \code{to_csv}: Converts a column containing a \code{structType} into a Column of CSV string. +#' Resolving the Column can fail if an unsupported type is encountered. +#' +#' @rdname column_collection_functions +#' @aliases to_csv to_csv,Column-method +#' @examples +#' +#' \dontrun{ +#' # Converts a struct into a CSV string +#' df2 <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d") +#' select(df2, to_csv(df2$d, dateFormat = 'dd/MM/yyyy'))} +#' @note to_csv since 3.0.0 +setMethod("to_csv", signature(x = "Column"), + function(x, ...) { + options <- varargsToStrEnv(...) + jc <- callJStatic("org.apache.spark.sql.functions", "to_csv", x@jc, options) + column(jc) + }) + +#' @details #' \code{to_timestamp}: Converts the column into a TimestampType. You may optionally specify #' a format according to the rules in: #' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}. http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/R/pkg/R/generics.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 76e17c1..463102c 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1303,6 +1303,10 @@ setGeneric("to_date", function(x, format) { standardGeneric("to_date") }) #' @name NULL setGeneric("to_json", function(x, ...) { standardGeneric("to_json") }) +#' @rdname column_collection_functions +#' @name NULL +setGeneric("to_csv", function(x, ...) { standardGeneric("to_csv") }) + #' @rdname column_datetime_functions #' @name NULL setGeneric("to_timestamp", function(x, format) { standardGeneric("to_timestamp") }) http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/R/pkg/tests/fulltests/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 58e0a54..faec387 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1689,6 +1689,11 @@ test_that("column functions", { expect_equal(arr$arrcol[[1]][[2]]$name, "Alice") } + # Test to_csv() + df <- sql("SELECT named_struct('name', 'Bob') as people") + j <- collect(select(df, alias(to_csv(df$people), "csv"))) + expect_equal(j[order(j$csv), ][1], "Bob") + # Test create_array() and create_map() df <- as.DataFrame(data.frame( x = c(1.0, 2.0), y = c(-1.0, 3.0), z = c(-2.0, 5.0) http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index beb1a06..24824ef 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2391,6 +2391,28 @@ def schema_of_csv(csv, options={}): return Column(jc) +@ignore_unicode_prefix +@since(3.0) +def to_csv(col, options={}): + """ + Converts a column containing a :class:`StructType` into a CSV string. + Throws an exception, in the case of an unsupported type. + + :param col: name of column containing a struct. + :param options: options to control converting. accepts the same options as the CSV datasource. + + >>> from pyspark.sql import Row + >>> data = [(1, Row(name='Alice', age=2))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_csv(df.value).alias("csv")).collect() + [Row(csv=u'2,Alice')] + """ + + sc = SparkContext._active_spark_context + jc = sc._jvm.functions.to_csv(_to_java_column(col), options) + return Column(jc) + + @since(1.5) def size(col): """ http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index cf8fb7e..c79f990 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -527,7 +527,8 @@ object FunctionRegistry { // csv expression[CsvToStructs]("from_csv"), - expression[SchemaOfCsv]("schema_of_csv") + expression[SchemaOfCsv]("schema_of_csv"), + expression[StructsToCsv]("to_csv") ) val builtin: SimpleFunctionRegistry = { http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala new file mode 100644 index 0000000..1218f92 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +import java.io.Writer + +import com.univocity.parsers.csv.CsvWriter + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ + +class UnivocityGenerator( + schema: StructType, + writer: Writer, + options: CSVOptions) { + private val writerSettings = options.asWriterSettings + writerSettings.setHeaders(schema.fieldNames: _*) + private val gen = new CsvWriter(writer, writerSettings) + private var printHeader = options.headerFlag + + // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`. + // When the value is null, this converter should not be called. + private type ValueConverter = (InternalRow, Int) => String + + // `ValueConverter`s for all values in the fields of the schema + private val valueConverters: Array[ValueConverter] = + schema.map(_.dataType).map(makeConverter).toArray + + private def makeConverter(dataType: DataType): ValueConverter = dataType match { + case DateType => + (row: InternalRow, ordinal: Int) => + options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) + + case TimestampType => + (row: InternalRow, ordinal: Int) => + options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + + case udt: UserDefinedType[_] => makeConverter(udt.sqlType) + + case dt: DataType => + (row: InternalRow, ordinal: Int) => + row.get(ordinal, dt).toString + } + + private def convertRow(row: InternalRow): Seq[String] = { + var i = 0 + val values = new Array[String](row.numFields) + while (i < row.numFields) { + if (!row.isNullAt(i)) { + values(i) = valueConverters(i).apply(row, i) + } else { + values(i) = options.nullValue + } + i += 1 + } + values + } + + /** + * Writes a single InternalRow to CSV using Univocity. + */ + def write(row: InternalRow): Unit = { + if (printHeader) { + gen.writeHeaders() + } + gen.writeRow(convertRow(row): _*) + printHeader = false + } + + def writeToString(row: InternalRow): String = { + gen.writeRowToString(convertRow(row): _*) + } + + def close(): Unit = gen.close() + + def flush(): Unit = gen.flush() +} http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index e70296f..74b670a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import java.io.CharArrayWriter + import com.univocity.parsers.csv.CsvParser import org.apache.spark.sql.AnalysisException @@ -174,3 +176,68 @@ case class SchemaOfCsv( override def prettyName: String = "schema_of_csv" } + +/** + * Converts a [[StructType]] to a CSV output string. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(expr[, options]) - Returns a CSV string with a given struct value", + examples = """ + Examples: + > SELECT _FUNC_(named_struct('a', 1, 'b', 2)); + 1,2 + > SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); + "26/08/2015" + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class StructsToCsv( + options: Map[String, String], + child: Expression, + timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { + override def nullable: Boolean = true + + def this(options: Map[String, String], child: Expression) = this(options, child, None) + + // Used in `FunctionRegistry` + def this(child: Expression) = this(Map.empty, child, None) + + def this(child: Expression, options: Expression) = + this( + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + @transient + lazy val writer = new CharArrayWriter() + + @transient + lazy val inputSchema: StructType = child.dataType match { + case st: StructType => st + case other => + throw new IllegalArgumentException(s"Unsupported input type ${other.catalogString}") + } + + @transient + lazy val gen = new UnivocityGenerator( + inputSchema, writer, new CSVOptions(options, columnPruning = true, timeZoneId.get)) + + // This converts rows to the CSV output according to the given schema. + @transient + lazy val converter: Any => UTF8String = { + (row: Any) => UTF8String.fromString(gen.writeToString(row.asInstanceOf[InternalRow])) + } + + override def dataType: DataType = StringType + + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + + override def nullSafeEval(value: Any): Any = converter(value) + + override def inputTypes: Seq[AbstractDataType] = StructType :: Nil + + override def prettyName: String = "to_csv" +} http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index 386e0d1..d006197 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -165,4 +165,48 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P new SchemaOfCsv(Literal.create("1|abc"), Map("delimiter" -> "|")), "struct<_c0:int,_c1:string>") } + + test("to_csv - struct") { + val schema = StructType(StructField("a", IntegerType) :: Nil) + val struct = Literal.create(create_row(1), schema) + checkEvaluation(StructsToCsv(Map.empty, struct, gmtId), "1") + } + + test("to_csv null input column") { + val schema = StructType(StructField("a", IntegerType) :: Nil) + val struct = Literal.create(null, schema) + checkEvaluation( + StructsToCsv(Map.empty, struct, gmtId), + null + ) + } + + test("to_csv with timestamp") { + val schema = StructType(StructField("t", TimestampType) :: Nil) + val c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 0) + val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema) + + checkEvaluation(StructsToCsv(Map.empty, struct, gmtId), "2016-01-01T00:00:00.000Z") + checkEvaluation( + StructsToCsv(Map.empty, struct, Option("PST")), "2015-12-31T16:00:00.000-08:00") + + checkEvaluation( + StructsToCsv( + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", + DateTimeUtils.TIMEZONE_OPTION -> gmtId.get), + struct, + gmtId), + "2016-01-01T00:00:00" + ) + checkEvaluation( + StructsToCsv( + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", + DateTimeUtils.TIMEZONE_OPTION -> "PST"), + struct, + gmtId), + "2015-12-31T16:00:00" + ) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 954a5a9..964b56e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser} +import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityGenerator, UnivocityParser} import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala deleted file mode 100644 index 37d9d9a..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.csv - -import java.io.Writer - -import com.univocity.parsers.csv.CsvWriter - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types._ - -private[csv] class UnivocityGenerator( - schema: StructType, - writer: Writer, - options: CSVOptions) { - private val writerSettings = options.asWriterSettings - writerSettings.setHeaders(schema.fieldNames: _*) - private val gen = new CsvWriter(writer, writerSettings) - private var printHeader = options.headerFlag - - // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`. - // When the value is null, this converter should not be called. - private type ValueConverter = (InternalRow, Int) => String - - // `ValueConverter`s for all values in the fields of the schema - private val valueConverters: Array[ValueConverter] = - schema.map(_.dataType).map(makeConverter).toArray - - private def makeConverter(dataType: DataType): ValueConverter = dataType match { - case DateType => - (row: InternalRow, ordinal: Int) => - options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) - - case TimestampType => - (row: InternalRow, ordinal: Int) => - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) - - case udt: UserDefinedType[_] => makeConverter(udt.sqlType) - - case dt: DataType => - (row: InternalRow, ordinal: Int) => - row.get(ordinal, dt).toString - } - - private def convertRow(row: InternalRow): Seq[String] = { - var i = 0 - val values = new Array[String](row.numFields) - while (i < row.numFields) { - if (!row.isNullAt(i)) { - values(i) = valueConverters(i).apply(row, i) - } else { - values(i) = options.nullValue - } - i += 1 - } - values - } - - /** - * Writes a single InternalRow to CSV using Univocity. - */ - def write(row: InternalRow): Unit = { - if (printHeader) { - gen.writeHeaders() - } - gen.writeRow(convertRow(row): _*) - printHeader = false - } - - def close(): Unit = gen.close() - - def flush(): Unit = gen.flush() -} http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index f8c4d88..6bb1a49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3905,6 +3905,32 @@ object functions { withExpr(SchemaOfCsv(csv.expr, options.asScala.toMap)) } + /** + * (Java-specific) Converts a column containing a `StructType` into a CSV string with + * the specified schema. Throws an exception, in the case of an unsupported type. + * + * @param e a column containing a struct. + * @param options options to control how the struct column is converted into a CSV string. + * It accepts the same options and the json data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def to_csv(e: Column, options: java.util.Map[String, String]): Column = withExpr { + StructsToCsv(options.asScala.toMap, e.expr) + } + + /** + * Converts a column containing a `StructType` into a CSV string with the specified schema. + * Throws an exception, in the case of an unsupported type. + * + * @param e a column containing a struct. + * + * @group collection_funcs + * @since 3.0.0 + */ + def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String].asJava) + // scalastyle:off line.size.limit // scalastyle:off parameter.number http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql index 5be6f80..a1a4bc9 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql @@ -15,3 +15,9 @@ CREATE TEMPORARY VIEW csvTable(csvField, a) AS SELECT * FROM VALUES ('1,abc', 'a SELECT schema_of_csv(csvField) FROM csvTable; -- Clean up DROP VIEW IF EXISTS csvTable; +-- to_csv +select to_csv(named_struct('a', 1, 'b', 2)); +select to_csv(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); +-- Check if errors handled +select to_csv(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')); +select to_csv(named_struct('a', 1, 'b', 2), map('mode', 1)); http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out index 677bbd9..03d4bff 100644 --- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 13 +-- Number of queries: 17 -- !query 0 @@ -117,3 +117,37 @@ DROP VIEW IF EXISTS csvTable struct<> -- !query 12 output + + +-- !query 13 +select to_csv(named_struct('a', 1, 'b', 2)) +-- !query 13 schema +struct<to_csv(named_struct(a, 1, b, 2)):string> +-- !query 13 output +1,2 + + +-- !query 14 +select to_csv(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')) +-- !query 14 schema +struct<to_csv(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd'))):string> +-- !query 14 output +26/08/2015 + + +-- !query 15 +select to_csv(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')) +-- !query 15 schema +struct<> +-- !query 15 output +org.apache.spark.sql.AnalysisException +Must use a map() function for options;; line 1 pos 7 + + +-- !query 16 +select to_csv(named_struct('a', 1, 'b', 2), map('mode', 1)) +-- !query 16 schema +struct<> +-- !query 16 output +org.apache.spark.sql.AnalysisException +A type of keys and values in map() must be string, but got map<string,int>;; line 1 pos 7 http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 9395f05..eb6b248 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -45,7 +45,6 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) } - test("checking the columnNameOfCorruptRecord option") { val columnNameOfCorruptRecord = "_unparsed" val df = Seq("0,2013-111-11 12:13:14", "1,1983-08-04").toDS() @@ -74,4 +73,17 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { .select(schema_of_csv(lit("0.1 1"), Map("sep" -> " ").asJava)) checkAnswer(df, Seq(Row("struct<_c0:double,_c1:int>"))) } + + test("to_csv - struct") { + val df = Seq(Tuple1(Tuple1(1))).toDF("a") + + checkAnswer(df.select(to_csv($"a")), Row("1") :: Nil) + } + + test("to_csv with option") { + val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") + val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm").asJava + + checkAnswer(df.select(to_csv($"a", options)), Row("26/08/2015 18:00") :: Nil) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org