Repository: spark
Updated Branches:
  refs/heads/master 9d4dd7992 -> e9af9460b


http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
deleted file mode 100644
index 492a21b..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.csv
-
-import java.nio.charset.StandardCharsets
-import java.util.{Locale, TimeZone}
-
-import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, 
UnescapedQuoteHandling}
-import org.apache.commons.lang3.time.FastDateFormat
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util._
-
-class CSVOptions(
-    @transient val parameters: CaseInsensitiveMap[String],
-    val columnPruning: Boolean,
-    defaultTimeZoneId: String,
-    defaultColumnNameOfCorruptRecord: String)
-  extends Logging with Serializable {
-
-  def this(
-    parameters: Map[String, String],
-    columnPruning: Boolean,
-    defaultTimeZoneId: String,
-    defaultColumnNameOfCorruptRecord: String = "") = {
-      this(
-        CaseInsensitiveMap(parameters),
-        columnPruning,
-        defaultTimeZoneId,
-        defaultColumnNameOfCorruptRecord)
-  }
-
-  private def getChar(paramName: String, default: Char): Char = {
-    val paramValue = parameters.get(paramName)
-    paramValue match {
-      case None => default
-      case Some(null) => default
-      case Some(value) if value.length == 0 => '\u0000'
-      case Some(value) if value.length == 1 => value.charAt(0)
-      case _ => throw new RuntimeException(s"$paramName cannot be more than 
one character")
-    }
-  }
-
-  private def getInt(paramName: String, default: Int): Int = {
-    val paramValue = parameters.get(paramName)
-    paramValue match {
-      case None => default
-      case Some(null) => default
-      case Some(value) => try {
-        value.toInt
-      } catch {
-        case e: NumberFormatException =>
-          throw new RuntimeException(s"$paramName should be an integer. Found 
$value")
-      }
-    }
-  }
-
-  private def getBool(paramName: String, default: Boolean = false): Boolean = {
-    val param = parameters.getOrElse(paramName, default.toString)
-    if (param == null) {
-      default
-    } else if (param.toLowerCase(Locale.ROOT) == "true") {
-      true
-    } else if (param.toLowerCase(Locale.ROOT) == "false") {
-      false
-    } else {
-      throw new Exception(s"$paramName flag can be true or false")
-    }
-  }
-
-  val delimiter = CSVUtils.toChar(
-    parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
-  val parseMode: ParseMode =
-    parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
-  val charset = parameters.getOrElse("encoding",
-    parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))
-
-  val quote = getChar("quote", '\"')
-  val escape = getChar("escape", '\\')
-  val charToEscapeQuoteEscaping = parameters.get("charToEscapeQuoteEscaping") 
match {
-    case None => None
-    case Some(null) => None
-    case Some(value) if value.length == 0 => None
-    case Some(value) if value.length == 1 => Some(value.charAt(0))
-    case _ =>
-      throw new RuntimeException("charToEscapeQuoteEscaping cannot be more 
than one character")
-  }
-  val comment = getChar("comment", '\u0000')
-
-  val headerFlag = getBool("header")
-  val inferSchemaFlag = getBool("inferSchema")
-  val ignoreLeadingWhiteSpaceInRead = getBool("ignoreLeadingWhiteSpace", 
default = false)
-  val ignoreTrailingWhiteSpaceInRead = getBool("ignoreTrailingWhiteSpace", 
default = false)
-
-  // For write, both options were `true` by default. We leave it as `true` for
-  // backwards compatibility.
-  val ignoreLeadingWhiteSpaceFlagInWrite = getBool("ignoreLeadingWhiteSpace", 
default = true)
-  val ignoreTrailingWhiteSpaceFlagInWrite = 
getBool("ignoreTrailingWhiteSpace", default = true)
-
-  val columnNameOfCorruptRecord =
-    parameters.getOrElse("columnNameOfCorruptRecord", 
defaultColumnNameOfCorruptRecord)
-
-  val nullValue = parameters.getOrElse("nullValue", "")
-
-  val nanValue = parameters.getOrElse("nanValue", "NaN")
-
-  val positiveInf = parameters.getOrElse("positiveInf", "Inf")
-  val negativeInf = parameters.getOrElse("negativeInf", "-Inf")
-
-
-  val compressionCodec: Option[String] = {
-    val name = parameters.get("compression").orElse(parameters.get("codec"))
-    name.map(CompressionCodecs.getCodecClassName)
-  }
-
-  val timeZone: TimeZone = DateTimeUtils.getTimeZone(
-    parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
-
-  // Uses `FastDateFormat` which can be direct replacement for 
`SimpleDateFormat` and thread-safe.
-  val dateFormat: FastDateFormat =
-    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", 
"yyyy-MM-dd"), Locale.US)
-
-  val timestampFormat: FastDateFormat =
-    FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), 
timeZone, Locale.US)
-
-  val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
-
-  val maxColumns = getInt("maxColumns", 20480)
-
-  val maxCharsPerColumn = getInt("maxCharsPerColumn", -1)
-
-  val escapeQuotes = getBool("escapeQuotes", true)
-
-  val quoteAll = getBool("quoteAll", false)
-
-  val inputBufferSize = 128
-
-  val isCommentSet = this.comment != '\u0000'
-
-  val samplingRatio =
-    parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
-
-  /**
-   * Forcibly apply the specified or inferred schema to datasource files.
-   * If the option is enabled, headers of CSV files will be ignored.
-   */
-  val enforceSchema = getBool("enforceSchema", default = true)
-
-
-  /**
-   * String representation of an empty value in read and in write.
-   */
-  val emptyValue = parameters.get("emptyValue")
-  /**
-   * The string is returned when CSV reader doesn't have any characters for 
input value,
-   * or an empty quoted string `""`. Default value is empty string.
-   */
-  val emptyValueInRead = emptyValue.getOrElse("")
-  /**
-   * The value is used instead of an empty string in write. Default value is 
`""`
-   */
-  val emptyValueInWrite = emptyValue.getOrElse("\"\"")
-
-  def asWriterSettings: CsvWriterSettings = {
-    val writerSettings = new CsvWriterSettings()
-    val format = writerSettings.getFormat
-    format.setDelimiter(delimiter)
-    format.setQuote(quote)
-    format.setQuoteEscape(escape)
-    charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
-    format.setComment(comment)
-    
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
-    
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
-    writerSettings.setNullValue(nullValue)
-    writerSettings.setEmptyValue(emptyValueInWrite)
-    writerSettings.setSkipEmptyLines(true)
-    writerSettings.setQuoteAllFields(quoteAll)
-    writerSettings.setQuoteEscapingEnabled(escapeQuotes)
-    writerSettings
-  }
-
-  def asParserSettings: CsvParserSettings = {
-    val settings = new CsvParserSettings()
-    val format = settings.getFormat
-    format.setDelimiter(delimiter)
-    format.setQuote(quote)
-    format.setQuoteEscape(escape)
-    charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
-    format.setComment(comment)
-    settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
-    settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)
-    settings.setReadInputOnSeparateThread(false)
-    settings.setInputBufferSize(inputBufferSize)
-    settings.setMaxColumns(maxColumns)
-    settings.setNullValue(nullValue)
-    settings.setEmptyValue(emptyValueInRead)
-    settings.setMaxCharsPerColumn(maxCharsPerColumn)
-    
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
-    settings
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
index 0a7473c..21fabac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.catalyst.csv.CSVExprUtils
+import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.functions._
 
 object CSVUtils {
@@ -40,16 +42,6 @@ object CSVUtils {
   }
 
   /**
-   * Filter ignorable rows for CSV iterator (lines empty and starting with 
`comment`).
-   * This is currently being used in CSV reading path and CSV schema inference.
-   */
-  def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): 
Iterator[String] = {
-    iter.filter { line =>
-      line.trim.nonEmpty && !line.startsWith(options.comment.toString)
-    }
-  }
-
-  /**
    * Skip the given first line so that only data can remain in a dataset.
    * This is similar with `dropHeaderLine` below and currently being used in 
CSV schema inference.
    */
@@ -67,29 +59,6 @@ object CSVUtils {
     }
   }
 
-  def skipComments(iter: Iterator[String], options: CSVOptions): 
Iterator[String] = {
-    if (options.isCommentSet) {
-      val commentPrefix = options.comment.toString
-      iter.dropWhile { line =>
-        line.trim.isEmpty || line.trim.startsWith(commentPrefix)
-      }
-    } else {
-      iter.dropWhile(_.trim.isEmpty)
-    }
-  }
-
-  /**
-   * Extracts header and moves iterator forward so that only data remains in it
-   */
-  def extractHeader(iter: Iterator[String], options: CSVOptions): 
Option[String] = {
-    val nonEmptyLines = skipComments(iter, options)
-    if (nonEmptyLines.hasNext) {
-      Some(nonEmptyLines.next())
-    } else {
-      None
-    }
-  }
-
   /**
    * Generates a header from the given row which is null-safe and 
duplicate-safe.
    */
@@ -133,35 +102,6 @@ object CSVUtils {
   }
 
   /**
-   * Helper method that converts string representation of a character to 
actual character.
-   * It handles some Java escaped strings and throws exception if given string 
is longer than one
-   * character.
-   */
-  @throws[IllegalArgumentException]
-  def toChar(str: String): Char = {
-    (str: Seq[Char]) match {
-      case Seq() => throw new IllegalArgumentException("Delimiter cannot be 
empty string")
-      case Seq('\\') => throw new IllegalArgumentException("Single backslash 
is prohibited." +
-        " It has special meaning as beginning of an escape sequence." +
-        " To get the backslash character, pass a string with two backslashes 
as the delimiter.")
-      case Seq(c) => c
-      case Seq('\\', 't') => '\t'
-      case Seq('\\', 'r') => '\r'
-      case Seq('\\', 'b') => '\b'
-      case Seq('\\', 'f') => '\f'
-      // In case user changes quote char and uses \" as delimiter in options
-      case Seq('\\', '\"') => '\"'
-      case Seq('\\', '\'') => '\''
-      case Seq('\\', '\\') => '\\'
-      case _ if str == """\u0000""" => '\u0000'
-      case Seq('\\', _) =>
-        throw new IllegalArgumentException(s"Unsupported special character for 
delimiter: $str")
-      case _ =>
-        throw new IllegalArgumentException(s"Delimiter cannot be more than one 
character: $str")
-    }
-  }
-
-  /**
    * Sample CSV dataset as configured by `samplingRatio`.
    */
   def sample(csv: Dataset[String], options: CSVOptions): Dataset[String] = {
@@ -186,4 +126,7 @@ object CSVUtils {
       csv.sample(withReplacement = false, options.samplingRatio, 1)
     }
   }
+
+  def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): 
Iterator[String] =
+    CSVExprUtils.filterCommentAndEmpty(iter, options)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
index 4082a0d..37d9d9a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
@@ -22,6 +22,7 @@ import java.io.Writer
 import com.univocity.parsers.csv.CsvWriter
 
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
deleted file mode 100644
index fbd19c6..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.csv
-
-import java.io.InputStream
-import java.math.BigDecimal
-
-import scala.util.Try
-import scala.util.control.NonFatal
-
-import com.univocity.parsers.csv.CsvParser
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
-import org.apache.spark.sql.execution.datasources.FailureSafeParser
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-
-/**
- * Constructs a parser for a given schema that translates CSV data to an 
[[InternalRow]].
- *
- * @param dataSchema The CSV data schema that is specified by the user, or 
inferred from underlying
- *                   data files.
- * @param requiredSchema The schema of the data that should be output for each 
row. This should be a
- *                       subset of the columns in dataSchema.
- * @param options Configuration options for a CSV parser.
- */
-class UnivocityParser(
-    dataSchema: StructType,
-    requiredSchema: StructType,
-    val options: CSVOptions) extends Logging {
-  require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
-    s"requiredSchema (${requiredSchema.catalogString}) should be the subset of 
" +
-      s"dataSchema (${dataSchema.catalogString}).")
-
-  def this(schema: StructType, options: CSVOptions) = this(schema, schema, 
options)
-
-  // A `ValueConverter` is responsible for converting the given value to a 
desired type.
-  private type ValueConverter = String => Any
-
-  // This index is used to reorder parsed tokens
-  private val tokenIndexArr =
-    requiredSchema.map(f => 
java.lang.Integer.valueOf(dataSchema.indexOf(f))).toArray
-
-  // When column pruning is enabled, the parser only parses the required 
columns based on
-  // their positions in the data schema.
-  private val parsedSchema = if (options.columnPruning) requiredSchema else 
dataSchema
-
-  val tokenizer = {
-    val parserSetting = options.asParserSettings
-    // When to-be-parsed schema is shorter than the to-be-read data schema, we 
let Univocity CSV
-    // parser select a sequence of fields for reading by their positions.
-    // if (options.columnPruning && requiredSchema.length < dataSchema.length) 
{
-    if (parsedSchema.length < dataSchema.length) {
-      parserSetting.selectIndexes(tokenIndexArr: _*)
-    }
-    new CsvParser(parserSetting)
-  }
-
-  private val row = new GenericInternalRow(requiredSchema.length)
-
-  // Retrieve the raw record string.
-  private def getCurrentInput: UTF8String = {
-    
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
-  }
-
-  // This parser first picks some tokens from the input tokens, according to 
the required schema,
-  // then parse these tokens and put the values in a row, with the order 
specified by the required
-  // schema.
-  //
-  // For example, let's say there is CSV data as below:
-  //
-  //   a,b,c
-  //   1,2,A
-  //
-  // So the CSV data schema is: ["a", "b", "c"]
-  // And let's say the required schema is: ["c", "b"]
-  //
-  // with the input tokens,
-  //
-  //   input tokens - [1, 2, "A"]
-  //
-  // Each input token is placed in each output row's position by mapping 
these. In this case,
-  //
-  //   output row - ["A", 2]
-  private val valueConverters: Array[ValueConverter] = {
-    requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, 
options)).toArray
-  }
-
-  /**
-   * Create a converter which converts the string value to a value according 
to a desired type.
-   * Currently, we do not support complex types (`ArrayType`, `MapType`, 
`StructType`).
-   *
-   * For other nullable types, returns null if it is null or equals to the 
value specified
-   * in `nullValue` option.
-   */
-  def makeConverter(
-      name: String,
-      dataType: DataType,
-      nullable: Boolean = true,
-      options: CSVOptions): ValueConverter = dataType match {
-    case _: ByteType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options)(_.toByte)
-
-    case _: ShortType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options)(_.toShort)
-
-    case _: IntegerType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options)(_.toInt)
-
-    case _: LongType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options)(_.toLong)
-
-    case _: FloatType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options) {
-        case options.nanValue => Float.NaN
-        case options.negativeInf => Float.NegativeInfinity
-        case options.positiveInf => Float.PositiveInfinity
-        case datum => datum.toFloat
-      }
-
-    case _: DoubleType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options) {
-        case options.nanValue => Double.NaN
-        case options.negativeInf => Double.NegativeInfinity
-        case options.positiveInf => Double.PositiveInfinity
-        case datum => datum.toDouble
-      }
-
-    case _: BooleanType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options)(_.toBoolean)
-
-    case dt: DecimalType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options) { datum =>
-        val value = new BigDecimal(datum.replaceAll(",", ""))
-        Decimal(value, dt.precision, dt.scale)
-      }
-
-    case _: TimestampType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options) { datum =>
-        // This one will lose microseconds parts.
-        // See https://issues.apache.org/jira/browse/SPARK-10681.
-        Try(options.timestampFormat.parse(datum).getTime * 1000L)
-          .getOrElse {
-          // If it fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
-          // compatibility.
-          DateTimeUtils.stringToTime(datum).getTime * 1000L
-        }
-      }
-
-    case _: DateType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options) { datum =>
-        // This one will lose microseconds parts.
-        // See https://issues.apache.org/jira/browse/SPARK-10681.x
-        
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
-          .getOrElse {
-          // If it fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
-          // compatibility.
-          DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
-        }
-      }
-
-    case _: StringType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options)(UTF8String.fromString)
-
-    case udt: UserDefinedType[_] => (datum: String) =>
-      makeConverter(name, udt.sqlType, nullable, options)
-
-    // We don't actually hit this exception though, we keep it for 
understandability
-    case _ => throw new RuntimeException(s"Unsupported type: 
${dataType.typeName}")
-  }
-
-  private def nullSafeDatum(
-       datum: String,
-       name: String,
-       nullable: Boolean,
-       options: CSVOptions)(converter: ValueConverter): Any = {
-    if (datum == options.nullValue || datum == null) {
-      if (!nullable) {
-        throw new RuntimeException(s"null value found but field $name is not 
nullable.")
-      }
-      null
-    } else {
-      converter.apply(datum)
-    }
-  }
-
-  /**
-   * Parses a single CSV string and turns it into either one resulting row or 
no row (if the
-   * the record is malformed).
-   */
-  def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))
-
-  private val getToken = if (options.columnPruning) {
-    (tokens: Array[String], index: Int) => tokens(index)
-  } else {
-    (tokens: Array[String], index: Int) => tokens(tokenIndexArr(index))
-  }
-
-  private def convert(tokens: Array[String]): InternalRow = {
-    if (tokens == null) {
-      throw BadRecordException(
-        () => getCurrentInput,
-        () => None,
-        new RuntimeException("Malformed CSV record"))
-    } else if (tokens.length != parsedSchema.length) {
-      // If the number of tokens doesn't match the schema, we should treat it 
as a malformed record.
-      // However, we still have chance to parse some of the tokens, by adding 
extra null tokens in
-      // the tail if the number is smaller, or by dropping extra tokens if the 
number is larger.
-      val checkedTokens = if (parsedSchema.length > tokens.length) {
-        tokens ++ new Array[String](parsedSchema.length - tokens.length)
-      } else {
-        tokens.take(parsedSchema.length)
-      }
-      def getPartialResult(): Option[InternalRow] = {
-        try {
-          Some(convert(checkedTokens))
-        } catch {
-          case _: BadRecordException => None
-        }
-      }
-      // For records with less or more tokens than the schema, tries to return 
partial results
-      // if possible.
-      throw BadRecordException(
-        () => getCurrentInput,
-        () => getPartialResult(),
-        new RuntimeException("Malformed CSV record"))
-    } else {
-      try {
-        // When the length of the returned tokens is identical to the length 
of the parsed schema,
-        // we just need to convert the tokens that correspond to the required 
columns.
-        var i = 0
-        while (i < requiredSchema.length) {
-          row(i) = valueConverters(i).apply(getToken(tokens, i))
-          i += 1
-        }
-        row
-      } catch {
-        case NonFatal(e) =>
-          // For corrupted records with the number of tokens same as the 
schema,
-          // CSV reader doesn't support partial results. All fields other than 
the field
-          // configured by `columnNameOfCorruptRecord` are set to `null`.
-          throw BadRecordException(() => getCurrentInput, () => None, e)
-      }
-    }
-  }
-}
-
-private[csv] object UnivocityParser {
-
-  /**
-   * Parses a stream that contains CSV strings and turns it into an iterator 
of tokens.
-   */
-  def tokenizeStream(
-      inputStream: InputStream,
-      shouldDropHeader: Boolean,
-      tokenizer: CsvParser): Iterator[Array[String]] = {
-    val handleHeader: () => Unit =
-      () => if (shouldDropHeader) tokenizer.parseNext
-
-    convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
-  }
-
-  /**
-   * Parses a stream that contains CSV strings and turns it into an iterator 
of rows.
-   */
-  def parseStream(
-      inputStream: InputStream,
-      parser: UnivocityParser,
-      headerChecker: CSVHeaderChecker,
-      schema: StructType): Iterator[InternalRow] = {
-    val tokenizer = parser.tokenizer
-    val safeParser = new FailureSafeParser[Array[String]](
-      input => Seq(parser.convert(input)),
-      parser.options.parseMode,
-      schema,
-      parser.options.columnNameOfCorruptRecord,
-      parser.options.multiLine)
-
-    val handleHeader: () => Unit =
-      () => headerChecker.checkHeaderColumnNames(tokenizer)
-
-    convertStream(inputStream, tokenizer, handleHeader) { tokens =>
-      safeParser.parse(tokens)
-    }.flatten
-  }
-
-  private def convertStream[T](
-      inputStream: InputStream,
-      tokenizer: CsvParser,
-      handleHeader: () => Unit)(
-      convert: Array[String] => T) = new Iterator[T] {
-    tokenizer.beginParsing(inputStream)
-
-    // We can handle header here since here the stream is open.
-    handleHeader()
-
-    private var nextRecord = tokenizer.parseNext()
-
-    override def hasNext: Boolean = nextRecord != null
-
-    override def next(): T = {
-      if (!hasNext) {
-        throw new NoSuchElementException("End of stream")
-      }
-      val curRecord = convert(nextRecord)
-      nextRecord = tokenizer.parseNext()
-      curRecord
-    }
-  }
-
-  /**
-   * Parses an iterator that contains CSV strings and turns it into an 
iterator of rows.
-   */
-  def parseIterator(
-      lines: Iterator[String],
-      parser: UnivocityParser,
-      headerChecker: CSVHeaderChecker,
-      schema: StructType): Iterator[InternalRow] = {
-    headerChecker.checkHeaderColumnNames(lines, parser.tokenizer)
-
-    val options = parser.options
-
-    val filteredLines: Iterator[String] = 
CSVUtils.filterCommentAndEmpty(lines, options)
-
-    val safeParser = new FailureSafeParser[String](
-      input => Seq(parser.parse(input)),
-      parser.options.parseMode,
-      schema,
-      parser.options.columnNameOfCorruptRecord,
-      parser.options.multiLine)
-    filteredLines.flatMap(safeParser.parse)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 76f5837..c7608e2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -34,6 +34,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD}
 import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, 
JsonInferSchema, JSONOptions}
+import org.apache.spark.sql.catalyst.util.FailureSafeParser
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 4247d31..8def996 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3854,6 +3854,38 @@ object functions {
   @scala.annotation.varargs
   def map_concat(cols: Column*): Column = withExpr { 
MapConcat(cols.map(_.expr)) }
 
+  /**
+   * Parses a column containing a CSV string into a `StructType` with the 
specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the same 
options and the
+   *                CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: StructType, options: Map[String, String]): 
Column = withExpr {
+    CsvToStructs(schema, options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Parses a column containing a CSV string into a 
`StructType`
+   * with the specified schema. Returns `null`, in the case of an unparseable 
string.
+   *
+   * @param e a string column containing CSV data.
+   * @param schema the schema to use when parsing the CSV string
+   * @param options options to control how the CSV is parsed. accepts the same 
options and the
+   *                CSV data source.
+   *
+   * @group collection_funcs
+   * @since 3.0.0
+   */
+  def from_csv(e: Column, schema: Column, options: java.util.Map[String, 
String]): Column = {
+    withExpr(new CsvToStructs(e.expr, schema.expr, options.asScala.toMap))
+  }
+
   // scalastyle:off line.size.limit
   // scalastyle:off parameter.number
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql
new file mode 100644
index 0000000..d2214fd
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql
@@ -0,0 +1,9 @@
+-- from_csv
+select from_csv('1, 3.14', 'a INT, f FLOAT');
+select from_csv('26/08/2015', 'time Timestamp', map('timestampFormat', 
'dd/MM/yyyy'));
+-- Check if errors handled
+select from_csv('1', 1);
+select from_csv('1', 'a InvalidType');
+select from_csv('1', 'a INT', named_struct('mode', 'PERMISSIVE'));
+select from_csv('1', 'a INT', map('mode', 1));
+select from_csv();

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
new file mode 100644
index 0000000..15dbe36
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
@@ -0,0 +1,69 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 7
+
+
+-- !query 0
+select from_csv('1, 3.14', 'a INT, f FLOAT')
+-- !query 0 schema
+struct<csvtostructs(1, 3.14):struct<a:int,f:float>>
+-- !query 0 output
+{"a":1,"f":3.14}
+
+
+-- !query 1
+select from_csv('26/08/2015', 'time Timestamp', map('timestampFormat', 
'dd/MM/yyyy'))
+-- !query 1 schema
+struct<csvtostructs(26/08/2015):struct<time:timestamp>>
+-- !query 1 output
+{"time":2015-08-26 00:00:00.0}
+
+
+-- !query 2
+select from_csv('1', 1)
+-- !query 2 schema
+struct<>
+-- !query 2 output
+org.apache.spark.sql.AnalysisException
+Schema should be specified in DDL format as a string literal instead of 1;; 
line 1 pos 7
+
+
+-- !query 3
+select from_csv('1', 'a InvalidType')
+-- !query 3 schema
+struct<>
+-- !query 3 output
+org.apache.spark.sql.AnalysisException
+
+DataType invalidtype is not supported.(line 1, pos 2)
+
+== SQL ==
+a InvalidType
+--^^^
+; line 1 pos 7
+
+
+-- !query 4
+select from_csv('1', 'a INT', named_struct('mode', 'PERMISSIVE'))
+-- !query 4 schema
+struct<>
+-- !query 4 output
+org.apache.spark.sql.AnalysisException
+Must use a map() function for options;; line 1 pos 7
+
+
+-- !query 5
+select from_csv('1', 'a INT', map('mode', 1))
+-- !query 5 schema
+struct<>
+-- !query 5 output
+org.apache.spark.sql.AnalysisException
+A type of keys and values in map() must be string, but got map<string,int>;; 
line 1 pos 7
+
+
+-- !query 6
+select from_csv()
+-- !query 6 schema
+struct<>
+-- !query 6 output
+org.apache.spark.sql.AnalysisException
+Invalid number of arguments for function from_csv. Expected: one of 2 and 3; 
Found: 0; line 1 pos 7

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
new file mode 100644
index 0000000..38a2143
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+class CsvFunctionsSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  test("from_csv with empty options") {
+    val df = Seq("1").toDS()
+    val schema = "a int"
+
+    checkAnswer(
+      df.select(from_csv($"value", lit(schema), Map[String, String]().asJava)),
+      Row(Row(1)) :: Nil)
+  }
+
+  test("from_csv with option") {
+    val df = Seq("26/08/2015 18:00").toDS()
+    val schema = new StructType().add("time", TimestampType)
+    val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")
+
+    checkAnswer(
+      df.select(from_csv($"value", schema, options)),
+      Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0"))))
+  }
+
+
+  test("checking the columnNameOfCorruptRecord option") {
+    val columnNameOfCorruptRecord = "_unparsed"
+    val df = Seq("0,2013-111-11 12:13:14", "1,1983-08-04").toDS()
+    val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
+    val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, 
StringType)
+    val df2 = df
+      .select(from_csv($"value", schemaWithCorrField1, Map(
+        "mode" -> "Permissive", "columnNameOfCorruptRecord" -> 
columnNameOfCorruptRecord)))
+
+    checkAnswer(df2, Seq(
+      Row(Row(null, null, "0,2013-111-11 12:13:14")),
+      Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index 57e36e0..6b64f2f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.types._
 
 class CSVInferSchemaSuite extends SparkFunSuite {

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala
deleted file mode 100644
index 60fcbd2..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.csv
-
-import org.apache.spark.SparkFunSuite
-
-class CSVUtilsSuite extends SparkFunSuite {
-  test("Can parse escaped characters") {
-    assert(CSVUtils.toChar("""\t""") === '\t')
-    assert(CSVUtils.toChar("""\r""") === '\r')
-    assert(CSVUtils.toChar("""\b""") === '\b')
-    assert(CSVUtils.toChar("""\f""") === '\f')
-    assert(CSVUtils.toChar("""\"""") === '\"')
-    assert(CSVUtils.toChar("""\'""") === '\'')
-    assert(CSVUtils.toChar("""\u0000""") === '\u0000')
-    assert(CSVUtils.toChar("""\\""") === '\\')
-  }
-
-  test("Does not accept delimiter larger than one character") {
-    val exception = intercept[IllegalArgumentException]{
-      CSVUtils.toChar("ab")
-    }
-    assert(exception.getMessage.contains("cannot be more than one character"))
-  }
-
-  test("Throws exception for unsupported escaped characters") {
-    val exception = intercept[IllegalArgumentException]{
-      CSVUtils.toChar("""\1""")
-    }
-    assert(exception.getMessage.contains("Unsupported special character for 
delimiter"))
-  }
-
-  test("string with one backward slash is prohibited") {
-    val exception = intercept[IllegalArgumentException]{
-      CSVUtils.toChar("""\""")
-    }
-    assert(exception.getMessage.contains("Single backslash is prohibited"))
-  }
-
-  test("output proper error message for empty string") {
-    val exception = intercept[IllegalArgumentException]{
-      CSVUtils.toChar("")
-    }
-    assert(exception.getMessage.contains("Delimiter cannot be empty string"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
index 458edb2..6f23114 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import java.math.BigDecimal
-import java.util.Locale
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to