Repository: spark
Updated Branches:
  refs/heads/master 1a7abf3f4 -> 39399f40b


[SPARK-25638][SQL] Adding new function - to_csv()

## What changes were proposed in this pull request?

New functions takes a struct and converts it to a CSV strings using passed CSV 
options. It accepts the same CSV options as CSV data source does.

## How was this patch tested?

Added `CsvExpressionsSuite`, `CsvFunctionsSuite` as well as R, Python and SQL 
tests similar to tests for `to_json()`

Closes #22626 from MaxGekk/to_csv.

Lead-authored-by: Maxim Gekk <max.g...@gmail.com>
Co-authored-by: Maxim Gekk <maxim.g...@databricks.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39399f40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39399f40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39399f40

Branch: refs/heads/master
Commit: 39399f40b861f7d8e60d0e25d2f8801343477834
Parents: 1a7abf3
Author: Maxim Gekk <max.g...@gmail.com>
Authored: Sun Nov 4 14:57:38 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Sun Nov 4 14:57:38 2018 +0800

----------------------------------------------------------------------
 R/pkg/NAMESPACE                                 |  1 +
 R/pkg/R/functions.R                             | 31 +++++--
 R/pkg/R/generics.R                              |  4 +
 R/pkg/tests/fulltests/test_sparkSQL.R           |  5 ++
 python/pyspark/sql/functions.py                 | 22 +++++
 .../catalyst/analysis/FunctionRegistry.scala    |  3 +-
 .../sql/catalyst/csv/UnivocityGenerator.scala   | 93 ++++++++++++++++++++
 .../catalyst/expressions/csvExpressions.scala   | 67 ++++++++++++++
 .../expressions/CsvExpressionsSuite.scala       | 44 +++++++++
 .../datasources/csv/CSVFileFormat.scala         |  2 +-
 .../datasources/csv/UnivocityGenerator.scala    | 90 -------------------
 .../scala/org/apache/spark/sql/functions.scala  | 26 ++++++
 .../sql-tests/inputs/csv-functions.sql          |  6 ++
 .../sql-tests/results/csv-functions.sql.out     | 36 +++++++-
 .../apache/spark/sql/CsvFunctionsSuite.scala    | 14 ++-
 15 files changed, 345 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index f9f556e..9d4f05a 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -380,6 +380,7 @@ exportMethods("%<=>%",
               "tanh",
               "toDegrees",
               "toRadians",
+              "to_csv",
               "to_date",
               "to_json",
               "to_timestamp",

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index d2ca1d6..9292363 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -187,6 +187,7 @@ NULL
 #'          \itemize{
 #'          \item \code{to_json}: it is the column containing the struct, 
array of the structs,
 #'              the map or array of maps.
+#'          \item \code{to_csv}: it is the column containing the struct.
 #'          \item \code{from_json}: it is the column containing the JSON 
string.
 #'          \item \code{from_csv}: it is the column containing the CSV string.
 #'          }
@@ -204,11 +205,11 @@ NULL
 #'              also supported for the schema.
 #'          \item \code{from_csv}: a DDL-formatted string
 #'          }
-#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, 
this contains
-#'            additional named properties to control how it is converted, 
accepts the same
-#'            options as the JSON data source. Additionally \code{to_json} 
supports the "pretty"
-#'            option which enables pretty JSON generation. In 
\code{arrays_zip}, this contains
-#'            additional Columns of arrays to be merged.
+#' @param ... additional argument(s). In \code{to_json}, \code{to_csv} and 
\code{from_json},
+#'            this contains additional named properties to control how it is 
converted, accepts
+#'            the same options as the JSON/CSV data source. Additionally 
\code{to_json} supports
+#'            the "pretty" option which enables pretty JSON generation. In 
\code{arrays_zip},
+#'            this contains additional Columns of arrays to be merged.
 #' @name column_collection_functions
 #' @rdname column_collection_functions
 #' @family collection functions
@@ -1741,6 +1742,26 @@ setMethod("to_json", signature(x = "Column"),
           })
 
 #' @details
+#' \code{to_csv}: Converts a column containing a \code{structType} into a 
Column of CSV string.
+#' Resolving the Column can fail if an unsupported type is encountered.
+#'
+#' @rdname column_collection_functions
+#' @aliases to_csv to_csv,Column-method
+#' @examples
+#'
+#' \dontrun{
+#' # Converts a struct into a CSV string
+#' df2 <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d")
+#' select(df2, to_csv(df2$d, dateFormat = 'dd/MM/yyyy'))}
+#' @note to_csv since 3.0.0
+setMethod("to_csv", signature(x = "Column"),
+          function(x, ...) {
+            options <- varargsToStrEnv(...)
+            jc <- callJStatic("org.apache.spark.sql.functions", "to_csv", 
x@jc, options)
+            column(jc)
+          })
+
+#' @details
 #' \code{to_timestamp}: Converts the column into a TimestampType. You may 
optionally specify
 #' a format according to the rules in:
 #' 
\url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 76e17c1..463102c 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1303,6 +1303,10 @@ setGeneric("to_date", function(x, format) { 
standardGeneric("to_date") })
 #' @name NULL
 setGeneric("to_json", function(x, ...) { standardGeneric("to_json") })
 
+#' @rdname column_collection_functions
+#' @name NULL
+setGeneric("to_csv", function(x, ...) { standardGeneric("to_csv") })
+
 #' @rdname column_datetime_functions
 #' @name NULL
 setGeneric("to_timestamp", function(x, format) { 
standardGeneric("to_timestamp") })

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 58e0a54..faec387 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -1689,6 +1689,11 @@ test_that("column functions", {
     expect_equal(arr$arrcol[[1]][[2]]$name, "Alice")
   }
 
+  # Test to_csv()
+  df <- sql("SELECT named_struct('name', 'Bob') as people")
+  j <- collect(select(df, alias(to_csv(df$people), "csv")))
+  expect_equal(j[order(j$csv), ][1], "Bob")
+
   # Test create_array() and create_map()
   df <- as.DataFrame(data.frame(
     x = c(1.0, 2.0), y = c(-1.0, 3.0), z = c(-2.0, 5.0)

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index beb1a06..24824ef 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2391,6 +2391,28 @@ def schema_of_csv(csv, options={}):
     return Column(jc)
 
 
+@ignore_unicode_prefix
+@since(3.0)
+def to_csv(col, options={}):
+    """
+    Converts a column containing a :class:`StructType` into a CSV string.
+    Throws an exception, in the case of an unsupported type.
+
+    :param col: name of column containing a struct.
+    :param options: options to control converting. accepts the same options as 
the CSV datasource.
+
+    >>> from pyspark.sql import Row
+    >>> data = [(1, Row(name='Alice', age=2))]
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(to_csv(df.value).alias("csv")).collect()
+    [Row(csv=u'2,Alice')]
+    """
+
+    sc = SparkContext._active_spark_context
+    jc = sc._jvm.functions.to_csv(_to_java_column(col), options)
+    return Column(jc)
+
+
 @since(1.5)
 def size(col):
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index cf8fb7e..c79f990 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -527,7 +527,8 @@ object FunctionRegistry {
 
     // csv
     expression[CsvToStructs]("from_csv"),
-    expression[SchemaOfCsv]("schema_of_csv")
+    expression[SchemaOfCsv]("schema_of_csv"),
+    expression[StructsToCsv]("to_csv")
   )
 
   val builtin: SimpleFunctionRegistry = {

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
new file mode 100644
index 0000000..1218f92
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.csv
+
+import java.io.Writer
+
+import com.univocity.parsers.csv.CsvWriter
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+
+class UnivocityGenerator(
+    schema: StructType,
+    writer: Writer,
+    options: CSVOptions) {
+  private val writerSettings = options.asWriterSettings
+  writerSettings.setHeaders(schema.fieldNames: _*)
+  private val gen = new CsvWriter(writer, writerSettings)
+  private var printHeader = options.headerFlag
+
+  // A `ValueConverter` is responsible for converting a value of an 
`InternalRow` to `String`.
+  // When the value is null, this converter should not be called.
+  private type ValueConverter = (InternalRow, Int) => String
+
+  // `ValueConverter`s for all values in the fields of the schema
+  private val valueConverters: Array[ValueConverter] =
+    schema.map(_.dataType).map(makeConverter).toArray
+
+  private def makeConverter(dataType: DataType): ValueConverter = dataType 
match {
+    case DateType =>
+      (row: InternalRow, ordinal: Int) =>
+        
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+
+    case TimestampType =>
+      (row: InternalRow, ordinal: Int) =>
+        
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+
+    case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
+
+    case dt: DataType =>
+      (row: InternalRow, ordinal: Int) =>
+        row.get(ordinal, dt).toString
+  }
+
+  private def convertRow(row: InternalRow): Seq[String] = {
+    var i = 0
+    val values = new Array[String](row.numFields)
+    while (i < row.numFields) {
+      if (!row.isNullAt(i)) {
+        values(i) = valueConverters(i).apply(row, i)
+      } else {
+        values(i) = options.nullValue
+      }
+      i += 1
+    }
+    values
+  }
+
+  /**
+   * Writes a single InternalRow to CSV using Univocity.
+   */
+  def write(row: InternalRow): Unit = {
+    if (printHeader) {
+      gen.writeHeaders()
+    }
+    gen.writeRow(convertRow(row): _*)
+    printHeader = false
+  }
+
+  def writeToString(row: InternalRow): String = {
+    gen.writeRowToString(convertRow(row): _*)
+  }
+
+  def close(): Unit = gen.close()
+
+  def flush(): Unit = gen.flush()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
index e70296f..74b670a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import java.io.CharArrayWriter
+
 import com.univocity.parsers.csv.CsvParser
 
 import org.apache.spark.sql.AnalysisException
@@ -174,3 +176,68 @@ case class SchemaOfCsv(
 
   override def prettyName: String = "schema_of_csv"
 }
+
+/**
+ * Converts a [[StructType]] to a CSV output string.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(expr[, options]) - Returns a CSV string with a given struct 
value",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(named_struct('a', 1, 'b', 2));
+       1,2
+      > SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 
'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
+       "26/08/2015"
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class StructsToCsv(
+     options: Map[String, String],
+     child: Expression,
+     timeZoneId: Option[String] = None)
+  extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback 
with ExpectsInputTypes {
+  override def nullable: Boolean = true
+
+  def this(options: Map[String, String], child: Expression) = this(options, 
child, None)
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression) = this(Map.empty, child, None)
+
+  def this(child: Expression, options: Expression) =
+    this(
+      options = ExprUtils.convertToMapData(options),
+      child = child,
+      timeZoneId = None)
+
+  @transient
+  lazy val writer = new CharArrayWriter()
+
+  @transient
+  lazy val inputSchema: StructType = child.dataType match {
+    case st: StructType => st
+    case other =>
+      throw new IllegalArgumentException(s"Unsupported input type 
${other.catalogString}")
+  }
+
+  @transient
+  lazy val gen = new UnivocityGenerator(
+    inputSchema, writer, new CSVOptions(options, columnPruning = true, 
timeZoneId.get))
+
+  // This converts rows to the CSV output according to the given schema.
+  @transient
+  lazy val converter: Any => UTF8String = {
+    (row: Any) => 
UTF8String.fromString(gen.writeToString(row.asInstanceOf[InternalRow]))
+  }
+
+  override def dataType: DataType = StringType
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+    copy(timeZoneId = Option(timeZoneId))
+
+  override def nullSafeEval(value: Any): Any = converter(value)
+
+  override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
+
+  override def prettyName: String = "to_csv"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
index 386e0d1..d006197 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
@@ -165,4 +165,48 @@ class CsvExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with P
       new SchemaOfCsv(Literal.create("1|abc"), Map("delimiter" -> "|")),
       "struct<_c0:int,_c1:string>")
   }
+
+  test("to_csv - struct") {
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    val struct = Literal.create(create_row(1), schema)
+    checkEvaluation(StructsToCsv(Map.empty, struct, gmtId), "1")
+  }
+
+  test("to_csv null input column") {
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    val struct = Literal.create(null, schema)
+    checkEvaluation(
+      StructsToCsv(Map.empty, struct, gmtId),
+      null
+    )
+  }
+
+  test("to_csv with timestamp") {
+    val schema = StructType(StructField("t", TimestampType) :: Nil)
+    val c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+    c.set(2016, 0, 1, 0, 0, 0)
+    c.set(Calendar.MILLISECOND, 0)
+    val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema)
+
+    checkEvaluation(StructsToCsv(Map.empty, struct, gmtId), 
"2016-01-01T00:00:00.000Z")
+    checkEvaluation(
+      StructsToCsv(Map.empty, struct, Option("PST")), 
"2015-12-31T16:00:00.000-08:00")
+
+    checkEvaluation(
+      StructsToCsv(
+        Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
+          DateTimeUtils.TIMEZONE_OPTION -> gmtId.get),
+        struct,
+        gmtId),
+      "2016-01-01T00:00:00"
+    )
+    checkEvaluation(
+      StructsToCsv(
+        Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
+          DateTimeUtils.TIMEZONE_OPTION -> "PST"),
+        struct,
+        gmtId),
+      "2015-12-31T16:00:00"
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 954a5a9..964b56e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, 
UnivocityParser}
+import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, 
UnivocityGenerator, UnivocityParser}
 import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
deleted file mode 100644
index 37d9d9a..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.csv
-
-import java.io.Writer
-
-import com.univocity.parsers.csv.CsvWriter
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-
-private[csv] class UnivocityGenerator(
-    schema: StructType,
-    writer: Writer,
-    options: CSVOptions) {
-  private val writerSettings = options.asWriterSettings
-  writerSettings.setHeaders(schema.fieldNames: _*)
-  private val gen = new CsvWriter(writer, writerSettings)
-  private var printHeader = options.headerFlag
-
-  // A `ValueConverter` is responsible for converting a value of an 
`InternalRow` to `String`.
-  // When the value is null, this converter should not be called.
-  private type ValueConverter = (InternalRow, Int) => String
-
-  // `ValueConverter`s for all values in the fields of the schema
-  private val valueConverters: Array[ValueConverter] =
-    schema.map(_.dataType).map(makeConverter).toArray
-
-  private def makeConverter(dataType: DataType): ValueConverter = dataType 
match {
-    case DateType =>
-      (row: InternalRow, ordinal: Int) =>
-        
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
-
-    case TimestampType =>
-      (row: InternalRow, ordinal: Int) =>
-        
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
-
-    case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
-
-    case dt: DataType =>
-      (row: InternalRow, ordinal: Int) =>
-        row.get(ordinal, dt).toString
-  }
-
-  private def convertRow(row: InternalRow): Seq[String] = {
-    var i = 0
-    val values = new Array[String](row.numFields)
-    while (i < row.numFields) {
-      if (!row.isNullAt(i)) {
-        values(i) = valueConverters(i).apply(row, i)
-      } else {
-        values(i) = options.nullValue
-      }
-      i += 1
-    }
-    values
-  }
-
-  /**
-   * Writes a single InternalRow to CSV using Univocity.
-   */
-  def write(row: InternalRow): Unit = {
-    if (printHeader) {
-      gen.writeHeaders()
-    }
-    gen.writeRow(convertRow(row): _*)
-    printHeader = false
-  }
-
-  def close(): Unit = gen.close()
-
-  def flush(): Unit = gen.flush()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index f8c4d88..6bb1a49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3905,6 +3905,32 @@ object functions {
     withExpr(SchemaOfCsv(csv.expr, options.asScala.toMap))
   }
 
+  /**
+   * (Java-specific) Converts a column containing a `StructType` into a CSV 
string with
+   * the specified schema. Throws an exception, in the case of an unsupported 
type.
+   *
+   * @param e a column containing a struct.
+   * @param options options to control how the struct column is converted into 
a CSV string.
+   *                It accepts the same options and the json data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def to_csv(e: Column, options: java.util.Map[String, String]): Column = 
withExpr {
+    StructsToCsv(options.asScala.toMap, e.expr)
+  }
+
+  /**
+   * Converts a column containing a `StructType` into a CSV string with the 
specified schema.
+   * Throws an exception, in the case of an unsupported type.
+   *
+   * @param e a column containing a struct.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String].asJava)
+
   // scalastyle:off line.size.limit
   // scalastyle:off parameter.number
 

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql
index 5be6f80..a1a4bc9 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql
@@ -15,3 +15,9 @@ CREATE TEMPORARY VIEW csvTable(csvField, a) AS SELECT * FROM 
VALUES ('1,abc', 'a
 SELECT schema_of_csv(csvField) FROM csvTable;
 -- Clean up
 DROP VIEW IF EXISTS csvTable;
+-- to_csv
+select to_csv(named_struct('a', 1, 'b', 2));
+select to_csv(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), 
map('timestampFormat', 'dd/MM/yyyy'));
+-- Check if errors handled
+select to_csv(named_struct('a', 1, 'b', 2), named_struct('mode', 
'PERMISSIVE'));
+select to_csv(named_struct('a', 1, 'b', 2), map('mode', 1));

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
index 677bbd9..03d4bff 100644
--- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 13
+-- Number of queries: 17
 
 
 -- !query 0
@@ -117,3 +117,37 @@ DROP VIEW IF EXISTS csvTable
 struct<>
 -- !query 12 output
 
+
+
+-- !query 13
+select to_csv(named_struct('a', 1, 'b', 2))
+-- !query 13 schema
+struct<to_csv(named_struct(a, 1, b, 2)):string>
+-- !query 13 output
+1,2
+
+
+-- !query 14
+select to_csv(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), 
map('timestampFormat', 'dd/MM/yyyy'))
+-- !query 14 schema
+struct<to_csv(named_struct(time, to_timestamp('2015-08-26', 
'yyyy-MM-dd'))):string>
+-- !query 14 output
+26/08/2015
+
+
+-- !query 15
+select to_csv(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'))
+-- !query 15 schema
+struct<>
+-- !query 15 output
+org.apache.spark.sql.AnalysisException
+Must use a map() function for options;; line 1 pos 7
+
+
+-- !query 16
+select to_csv(named_struct('a', 1, 'b', 2), map('mode', 1))
+-- !query 16 schema
+struct<>
+-- !query 16 output
+org.apache.spark.sql.AnalysisException
+A type of keys and values in map() must be string, but got map<string,int>;; 
line 1 pos 7

http://git-wip-us.apache.org/repos/asf/spark/blob/39399f40/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 9395f05..eb6b248 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -45,7 +45,6 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSQLContext {
       Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0"))))
   }
 
-
   test("checking the columnNameOfCorruptRecord option") {
     val columnNameOfCorruptRecord = "_unparsed"
     val df = Seq("0,2013-111-11 12:13:14", "1,1983-08-04").toDS()
@@ -74,4 +73,17 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSQLContext {
       .select(schema_of_csv(lit("0.1 1"), Map("sep" -> " ").asJava))
     checkAnswer(df, Seq(Row("struct<_c0:double,_c1:int>")))
   }
+
+  test("to_csv - struct") {
+    val df = Seq(Tuple1(Tuple1(1))).toDF("a")
+
+    checkAnswer(df.select(to_csv($"a")), Row("1") :: Nil)
+  }
+
+  test("to_csv with option") {
+    val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 
18:00:00.0")))).toDF("a")
+    val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm").asJava
+
+    checkAnswer(df.select(to_csv($"a", options)), Row("26/08/2015 18:00") :: 
Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to