Repository: spark Updated Branches: refs/heads/master 9d4dd7992 -> e9af9460b
http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala deleted file mode 100644 index 492a21b..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.csv - -import java.nio.charset.StandardCharsets -import java.util.{Locale, TimeZone} - -import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} -import org.apache.commons.lang3.time.FastDateFormat - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.util._ - -class CSVOptions( - @transient val parameters: CaseInsensitiveMap[String], - val columnPruning: Boolean, - defaultTimeZoneId: String, - defaultColumnNameOfCorruptRecord: String) - extends Logging with Serializable { - - def this( - parameters: Map[String, String], - columnPruning: Boolean, - defaultTimeZoneId: String, - defaultColumnNameOfCorruptRecord: String = "") = { - this( - CaseInsensitiveMap(parameters), - columnPruning, - defaultTimeZoneId, - defaultColumnNameOfCorruptRecord) - } - - private def getChar(paramName: String, default: Char): Char = { - val paramValue = parameters.get(paramName) - paramValue match { - case None => default - case Some(null) => default - case Some(value) if value.length == 0 => '\u0000' - case Some(value) if value.length == 1 => value.charAt(0) - case _ => throw new RuntimeException(s"$paramName cannot be more than one character") - } - } - - private def getInt(paramName: String, default: Int): Int = { - val paramValue = parameters.get(paramName) - paramValue match { - case None => default - case Some(null) => default - case Some(value) => try { - value.toInt - } catch { - case e: NumberFormatException => - throw new RuntimeException(s"$paramName should be an integer. Found $value") - } - } - } - - private def getBool(paramName: String, default: Boolean = false): Boolean = { - val param = parameters.getOrElse(paramName, default.toString) - if (param == null) { - default - } else if (param.toLowerCase(Locale.ROOT) == "true") { - true - } else if (param.toLowerCase(Locale.ROOT) == "false") { - false - } else { - throw new Exception(s"$paramName flag can be true or false") - } - } - - val delimiter = CSVUtils.toChar( - parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) - val parseMode: ParseMode = - parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode) - val charset = parameters.getOrElse("encoding", - parameters.getOrElse("charset", StandardCharsets.UTF_8.name())) - - val quote = getChar("quote", '\"') - val escape = getChar("escape", '\\') - val charToEscapeQuoteEscaping = parameters.get("charToEscapeQuoteEscaping") match { - case None => None - case Some(null) => None - case Some(value) if value.length == 0 => None - case Some(value) if value.length == 1 => Some(value.charAt(0)) - case _ => - throw new RuntimeException("charToEscapeQuoteEscaping cannot be more than one character") - } - val comment = getChar("comment", '\u0000') - - val headerFlag = getBool("header") - val inferSchemaFlag = getBool("inferSchema") - val ignoreLeadingWhiteSpaceInRead = getBool("ignoreLeadingWhiteSpace", default = false) - val ignoreTrailingWhiteSpaceInRead = getBool("ignoreTrailingWhiteSpace", default = false) - - // For write, both options were `true` by default. We leave it as `true` for - // backwards compatibility. - val ignoreLeadingWhiteSpaceFlagInWrite = getBool("ignoreLeadingWhiteSpace", default = true) - val ignoreTrailingWhiteSpaceFlagInWrite = getBool("ignoreTrailingWhiteSpace", default = true) - - val columnNameOfCorruptRecord = - parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) - - val nullValue = parameters.getOrElse("nullValue", "") - - val nanValue = parameters.getOrElse("nanValue", "NaN") - - val positiveInf = parameters.getOrElse("positiveInf", "Inf") - val negativeInf = parameters.getOrElse("negativeInf", "-Inf") - - - val compressionCodec: Option[String] = { - val name = parameters.get("compression").orElse(parameters.get("codec")) - name.map(CompressionCodecs.getCodecClassName) - } - - val timeZone: TimeZone = DateTimeUtils.getTimeZone( - parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) - - // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. - val dateFormat: FastDateFormat = - FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US) - - val timestampFormat: FastDateFormat = - FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US) - - val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - - val maxColumns = getInt("maxColumns", 20480) - - val maxCharsPerColumn = getInt("maxCharsPerColumn", -1) - - val escapeQuotes = getBool("escapeQuotes", true) - - val quoteAll = getBool("quoteAll", false) - - val inputBufferSize = 128 - - val isCommentSet = this.comment != '\u0000' - - val samplingRatio = - parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - - /** - * Forcibly apply the specified or inferred schema to datasource files. - * If the option is enabled, headers of CSV files will be ignored. - */ - val enforceSchema = getBool("enforceSchema", default = true) - - - /** - * String representation of an empty value in read and in write. - */ - val emptyValue = parameters.get("emptyValue") - /** - * The string is returned when CSV reader doesn't have any characters for input value, - * or an empty quoted string `""`. Default value is empty string. - */ - val emptyValueInRead = emptyValue.getOrElse("") - /** - * The value is used instead of an empty string in write. Default value is `""` - */ - val emptyValueInWrite = emptyValue.getOrElse("\"\"") - - def asWriterSettings: CsvWriterSettings = { - val writerSettings = new CsvWriterSettings() - val format = writerSettings.getFormat - format.setDelimiter(delimiter) - format.setQuote(quote) - format.setQuoteEscape(escape) - charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping) - format.setComment(comment) - writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite) - writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite) - writerSettings.setNullValue(nullValue) - writerSettings.setEmptyValue(emptyValueInWrite) - writerSettings.setSkipEmptyLines(true) - writerSettings.setQuoteAllFields(quoteAll) - writerSettings.setQuoteEscapingEnabled(escapeQuotes) - writerSettings - } - - def asParserSettings: CsvParserSettings = { - val settings = new CsvParserSettings() - val format = settings.getFormat - format.setDelimiter(delimiter) - format.setQuote(quote) - format.setQuoteEscape(escape) - charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping) - format.setComment(comment) - settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead) - settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead) - settings.setReadInputOnSeparateThread(false) - settings.setInputBufferSize(inputBufferSize) - settings.setMaxColumns(maxColumns) - settings.setNullValue(nullValue) - settings.setEmptyValue(emptyValueInRead) - settings.setMaxCharsPerColumn(maxCharsPerColumn) - settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) - settings - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index 0a7473c..21fabac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset +import org.apache.spark.sql.catalyst.csv.CSVExprUtils +import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.functions._ object CSVUtils { @@ -40,16 +42,6 @@ object CSVUtils { } /** - * Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). - * This is currently being used in CSV reading path and CSV schema inference. - */ - def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = { - iter.filter { line => - line.trim.nonEmpty && !line.startsWith(options.comment.toString) - } - } - - /** * Skip the given first line so that only data can remain in a dataset. * This is similar with `dropHeaderLine` below and currently being used in CSV schema inference. */ @@ -67,29 +59,6 @@ object CSVUtils { } } - def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = { - if (options.isCommentSet) { - val commentPrefix = options.comment.toString - iter.dropWhile { line => - line.trim.isEmpty || line.trim.startsWith(commentPrefix) - } - } else { - iter.dropWhile(_.trim.isEmpty) - } - } - - /** - * Extracts header and moves iterator forward so that only data remains in it - */ - def extractHeader(iter: Iterator[String], options: CSVOptions): Option[String] = { - val nonEmptyLines = skipComments(iter, options) - if (nonEmptyLines.hasNext) { - Some(nonEmptyLines.next()) - } else { - None - } - } - /** * Generates a header from the given row which is null-safe and duplicate-safe. */ @@ -133,35 +102,6 @@ object CSVUtils { } /** - * Helper method that converts string representation of a character to actual character. - * It handles some Java escaped strings and throws exception if given string is longer than one - * character. - */ - @throws[IllegalArgumentException] - def toChar(str: String): Char = { - (str: Seq[Char]) match { - case Seq() => throw new IllegalArgumentException("Delimiter cannot be empty string") - case Seq('\\') => throw new IllegalArgumentException("Single backslash is prohibited." + - " It has special meaning as beginning of an escape sequence." + - " To get the backslash character, pass a string with two backslashes as the delimiter.") - case Seq(c) => c - case Seq('\\', 't') => '\t' - case Seq('\\', 'r') => '\r' - case Seq('\\', 'b') => '\b' - case Seq('\\', 'f') => '\f' - // In case user changes quote char and uses \" as delimiter in options - case Seq('\\', '\"') => '\"' - case Seq('\\', '\'') => '\'' - case Seq('\\', '\\') => '\\' - case _ if str == """\u0000""" => '\u0000' - case Seq('\\', _) => - throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str") - case _ => - throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str") - } - } - - /** * Sample CSV dataset as configured by `samplingRatio`. */ def sample(csv: Dataset[String], options: CSVOptions): Dataset[String] = { @@ -186,4 +126,7 @@ object CSVUtils { csv.sample(withReplacement = false, options.samplingRatio, 1) } } + + def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = + CSVExprUtils.filterCommentAndEmpty(iter, options) } http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala index 4082a0d..37d9d9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala @@ -22,6 +22,7 @@ import java.io.Writer import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala deleted file mode 100644 index fbd19c6..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ /dev/null @@ -1,352 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.csv - -import java.io.InputStream -import java.math.BigDecimal - -import scala.util.Try -import scala.util.control.NonFatal - -import com.univocity.parsers.csv.CsvParser - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils} -import org.apache.spark.sql.execution.datasources.FailureSafeParser -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - - -/** - * Constructs a parser for a given schema that translates CSV data to an [[InternalRow]]. - * - * @param dataSchema The CSV data schema that is specified by the user, or inferred from underlying - * data files. - * @param requiredSchema The schema of the data that should be output for each row. This should be a - * subset of the columns in dataSchema. - * @param options Configuration options for a CSV parser. - */ -class UnivocityParser( - dataSchema: StructType, - requiredSchema: StructType, - val options: CSVOptions) extends Logging { - require(requiredSchema.toSet.subsetOf(dataSchema.toSet), - s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " + - s"dataSchema (${dataSchema.catalogString}).") - - def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) - - // A `ValueConverter` is responsible for converting the given value to a desired type. - private type ValueConverter = String => Any - - // This index is used to reorder parsed tokens - private val tokenIndexArr = - requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))).toArray - - // When column pruning is enabled, the parser only parses the required columns based on - // their positions in the data schema. - private val parsedSchema = if (options.columnPruning) requiredSchema else dataSchema - - val tokenizer = { - val parserSetting = options.asParserSettings - // When to-be-parsed schema is shorter than the to-be-read data schema, we let Univocity CSV - // parser select a sequence of fields for reading by their positions. - // if (options.columnPruning && requiredSchema.length < dataSchema.length) { - if (parsedSchema.length < dataSchema.length) { - parserSetting.selectIndexes(tokenIndexArr: _*) - } - new CsvParser(parserSetting) - } - - private val row = new GenericInternalRow(requiredSchema.length) - - // Retrieve the raw record string. - private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) - } - - // This parser first picks some tokens from the input tokens, according to the required schema, - // then parse these tokens and put the values in a row, with the order specified by the required - // schema. - // - // For example, let's say there is CSV data as below: - // - // a,b,c - // 1,2,A - // - // So the CSV data schema is: ["a", "b", "c"] - // And let's say the required schema is: ["c", "b"] - // - // with the input tokens, - // - // input tokens - [1, 2, "A"] - // - // Each input token is placed in each output row's position by mapping these. In this case, - // - // output row - ["A", 2] - private val valueConverters: Array[ValueConverter] = { - requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray - } - - /** - * Create a converter which converts the string value to a value according to a desired type. - * Currently, we do not support complex types (`ArrayType`, `MapType`, `StructType`). - * - * For other nullable types, returns null if it is null or equals to the value specified - * in `nullValue` option. - */ - def makeConverter( - name: String, - dataType: DataType, - nullable: Boolean = true, - options: CSVOptions): ValueConverter = dataType match { - case _: ByteType => (d: String) => - nullSafeDatum(d, name, nullable, options)(_.toByte) - - case _: ShortType => (d: String) => - nullSafeDatum(d, name, nullable, options)(_.toShort) - - case _: IntegerType => (d: String) => - nullSafeDatum(d, name, nullable, options)(_.toInt) - - case _: LongType => (d: String) => - nullSafeDatum(d, name, nullable, options)(_.toLong) - - case _: FloatType => (d: String) => - nullSafeDatum(d, name, nullable, options) { - case options.nanValue => Float.NaN - case options.negativeInf => Float.NegativeInfinity - case options.positiveInf => Float.PositiveInfinity - case datum => datum.toFloat - } - - case _: DoubleType => (d: String) => - nullSafeDatum(d, name, nullable, options) { - case options.nanValue => Double.NaN - case options.negativeInf => Double.NegativeInfinity - case options.positiveInf => Double.PositiveInfinity - case datum => datum.toDouble - } - - case _: BooleanType => (d: String) => - nullSafeDatum(d, name, nullable, options)(_.toBoolean) - - case dt: DecimalType => (d: String) => - nullSafeDatum(d, name, nullable, options) { datum => - val value = new BigDecimal(datum.replaceAll(",", "")) - Decimal(value, dt.precision, dt.scale) - } - - case _: TimestampType => (d: String) => - nullSafeDatum(d, name, nullable, options) { datum => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - Try(options.timestampFormat.parse(datum).getTime * 1000L) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.stringToTime(datum).getTime * 1000L - } - } - - case _: DateType => (d: String) => - nullSafeDatum(d, name, nullable, options) { datum => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681.x - Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) - } - } - - case _: StringType => (d: String) => - nullSafeDatum(d, name, nullable, options)(UTF8String.fromString) - - case udt: UserDefinedType[_] => (datum: String) => - makeConverter(name, udt.sqlType, nullable, options) - - // We don't actually hit this exception though, we keep it for understandability - case _ => throw new RuntimeException(s"Unsupported type: ${dataType.typeName}") - } - - private def nullSafeDatum( - datum: String, - name: String, - nullable: Boolean, - options: CSVOptions)(converter: ValueConverter): Any = { - if (datum == options.nullValue || datum == null) { - if (!nullable) { - throw new RuntimeException(s"null value found but field $name is not nullable.") - } - null - } else { - converter.apply(datum) - } - } - - /** - * Parses a single CSV string and turns it into either one resulting row or no row (if the - * the record is malformed). - */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) - - private val getToken = if (options.columnPruning) { - (tokens: Array[String], index: Int) => tokens(index) - } else { - (tokens: Array[String], index: Int) => tokens(tokenIndexArr(index)) - } - - private def convert(tokens: Array[String]): InternalRow = { - if (tokens == null) { - throw BadRecordException( - () => getCurrentInput, - () => None, - new RuntimeException("Malformed CSV record")) - } else if (tokens.length != parsedSchema.length) { - // If the number of tokens doesn't match the schema, we should treat it as a malformed record. - // However, we still have chance to parse some of the tokens, by adding extra null tokens in - // the tail if the number is smaller, or by dropping extra tokens if the number is larger. - val checkedTokens = if (parsedSchema.length > tokens.length) { - tokens ++ new Array[String](parsedSchema.length - tokens.length) - } else { - tokens.take(parsedSchema.length) - } - def getPartialResult(): Option[InternalRow] = { - try { - Some(convert(checkedTokens)) - } catch { - case _: BadRecordException => None - } - } - // For records with less or more tokens than the schema, tries to return partial results - // if possible. - throw BadRecordException( - () => getCurrentInput, - () => getPartialResult(), - new RuntimeException("Malformed CSV record")) - } else { - try { - // When the length of the returned tokens is identical to the length of the parsed schema, - // we just need to convert the tokens that correspond to the required columns. - var i = 0 - while (i < requiredSchema.length) { - row(i) = valueConverters(i).apply(getToken(tokens, i)) - i += 1 - } - row - } catch { - case NonFatal(e) => - // For corrupted records with the number of tokens same as the schema, - // CSV reader doesn't support partial results. All fields other than the field - // configured by `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => getCurrentInput, () => None, e) - } - } - } -} - -private[csv] object UnivocityParser { - - /** - * Parses a stream that contains CSV strings and turns it into an iterator of tokens. - */ - def tokenizeStream( - inputStream: InputStream, - shouldDropHeader: Boolean, - tokenizer: CsvParser): Iterator[Array[String]] = { - val handleHeader: () => Unit = - () => if (shouldDropHeader) tokenizer.parseNext - - convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens) - } - - /** - * Parses a stream that contains CSV strings and turns it into an iterator of rows. - */ - def parseStream( - inputStream: InputStream, - parser: UnivocityParser, - headerChecker: CSVHeaderChecker, - schema: StructType): Iterator[InternalRow] = { - val tokenizer = parser.tokenizer - val safeParser = new FailureSafeParser[Array[String]]( - input => Seq(parser.convert(input)), - parser.options.parseMode, - schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) - - val handleHeader: () => Unit = - () => headerChecker.checkHeaderColumnNames(tokenizer) - - convertStream(inputStream, tokenizer, handleHeader) { tokens => - safeParser.parse(tokens) - }.flatten - } - - private def convertStream[T]( - inputStream: InputStream, - tokenizer: CsvParser, - handleHeader: () => Unit)( - convert: Array[String] => T) = new Iterator[T] { - tokenizer.beginParsing(inputStream) - - // We can handle header here since here the stream is open. - handleHeader() - - private var nextRecord = tokenizer.parseNext() - - override def hasNext: Boolean = nextRecord != null - - override def next(): T = { - if (!hasNext) { - throw new NoSuchElementException("End of stream") - } - val curRecord = convert(nextRecord) - nextRecord = tokenizer.parseNext() - curRecord - } - } - - /** - * Parses an iterator that contains CSV strings and turns it into an iterator of rows. - */ - def parseIterator( - lines: Iterator[String], - parser: UnivocityParser, - headerChecker: CSVHeaderChecker, - schema: StructType): Iterator[InternalRow] = { - headerChecker.checkHeaderColumnNames(lines, parser.tokenizer) - - val options = parser.options - - val filteredLines: Iterator[String] = CSVUtils.filterCommentAndEmpty(lines, options) - - val safeParser = new FailureSafeParser[String]( - input => Seq(parser.parse(input)), - parser.options.parseMode, - schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) - filteredLines.flatMap(safeParser.parse) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 76f5837..c7608e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -34,6 +34,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JsonInferSchema, JSONOptions} +import org.apache.spark.sql.catalyst.util.FailureSafeParser import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 4247d31..8def996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3854,6 +3854,38 @@ object functions { @scala.annotation.varargs def map_concat(cols: Column*): Column = withExpr { MapConcat(cols.map(_.expr)) } + /** + * Parses a column containing a CSV string into a `StructType` with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + * CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { + CsvToStructs(schema, options, e.expr) + } + + /** + * (Java-specific) Parses a column containing a CSV string into a `StructType` + * with the specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing CSV data. + * @param schema the schema to use when parsing the CSV string + * @param options options to control how the CSV is parsed. accepts the same options and the + * CSV data source. + * + * @group collection_funcs + * @since 3.0.0 + */ + def from_csv(e: Column, schema: Column, options: java.util.Map[String, String]): Column = { + withExpr(new CsvToStructs(e.expr, schema.expr, options.asScala.toMap)) + } + // scalastyle:off line.size.limit // scalastyle:off parameter.number http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql new file mode 100644 index 0000000..d2214fd --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql @@ -0,0 +1,9 @@ +-- from_csv +select from_csv('1, 3.14', 'a INT, f FLOAT'); +select from_csv('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); +-- Check if errors handled +select from_csv('1', 1); +select from_csv('1', 'a InvalidType'); +select from_csv('1', 'a INT', named_struct('mode', 'PERMISSIVE')); +select from_csv('1', 'a INT', map('mode', 1)); +select from_csv(); http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out new file mode 100644 index 0000000..15dbe36 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out @@ -0,0 +1,69 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 7 + + +-- !query 0 +select from_csv('1, 3.14', 'a INT, f FLOAT') +-- !query 0 schema +struct<csvtostructs(1, 3.14):struct<a:int,f:float>> +-- !query 0 output +{"a":1,"f":3.14} + + +-- !query 1 +select from_csv('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')) +-- !query 1 schema +struct<csvtostructs(26/08/2015):struct<time:timestamp>> +-- !query 1 output +{"time":2015-08-26 00:00:00.0} + + +-- !query 2 +select from_csv('1', 1) +-- !query 2 schema +struct<> +-- !query 2 output +org.apache.spark.sql.AnalysisException +Schema should be specified in DDL format as a string literal instead of 1;; line 1 pos 7 + + +-- !query 3 +select from_csv('1', 'a InvalidType') +-- !query 3 schema +struct<> +-- !query 3 output +org.apache.spark.sql.AnalysisException + +DataType invalidtype is not supported.(line 1, pos 2) + +== SQL == +a InvalidType +--^^^ +; line 1 pos 7 + + +-- !query 4 +select from_csv('1', 'a INT', named_struct('mode', 'PERMISSIVE')) +-- !query 4 schema +struct<> +-- !query 4 output +org.apache.spark.sql.AnalysisException +Must use a map() function for options;; line 1 pos 7 + + +-- !query 5 +select from_csv('1', 'a INT', map('mode', 1)) +-- !query 5 schema +struct<> +-- !query 5 output +org.apache.spark.sql.AnalysisException +A type of keys and values in map() must be string, but got map<string,int>;; line 1 pos 7 + + +-- !query 6 +select from_csv() +-- !query 6 schema +struct<> +-- !query 6 output +org.apache.spark.sql.AnalysisException +Invalid number of arguments for function from_csv. Expected: one of 2 and 3; Found: 0; line 1 pos 7 http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala new file mode 100644 index 0000000..38a2143 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +class CsvFunctionsSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + test("from_csv with empty options") { + val df = Seq("1").toDS() + val schema = "a int" + + checkAnswer( + df.select(from_csv($"value", lit(schema), Map[String, String]().asJava)), + Row(Row(1)) :: Nil) + } + + test("from_csv with option") { + val df = Seq("26/08/2015 18:00").toDS() + val schema = new StructType().add("time", TimestampType) + val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm") + + checkAnswer( + df.select(from_csv($"value", schema, options)), + Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) + } + + + test("checking the columnNameOfCorruptRecord option") { + val columnNameOfCorruptRecord = "_unparsed" + val df = Seq("0,2013-111-11 12:13:14", "1,1983-08-04").toDS() + val schema = new StructType().add("a", IntegerType).add("b", TimestampType) + val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType) + val df2 = df + .select(from_csv($"value", schemaWithCorrField1, Map( + "mode" -> "Permissive", "columnNameOfCorruptRecord" -> columnNameOfCorruptRecord))) + + checkAnswer(df2, Seq( + Row(Row(null, null, "0,2013-111-11 12:13:14")), + Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null)))) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index 57e36e0..6b64f2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.types._ class CSVInferSchemaSuite extends SparkFunSuite { http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala deleted file mode 100644 index 60fcbd2..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.csv - -import org.apache.spark.SparkFunSuite - -class CSVUtilsSuite extends SparkFunSuite { - test("Can parse escaped characters") { - assert(CSVUtils.toChar("""\t""") === '\t') - assert(CSVUtils.toChar("""\r""") === '\r') - assert(CSVUtils.toChar("""\b""") === '\b') - assert(CSVUtils.toChar("""\f""") === '\f') - assert(CSVUtils.toChar("""\"""") === '\"') - assert(CSVUtils.toChar("""\'""") === '\'') - assert(CSVUtils.toChar("""\u0000""") === '\u0000') - assert(CSVUtils.toChar("""\\""") === '\\') - } - - test("Does not accept delimiter larger than one character") { - val exception = intercept[IllegalArgumentException]{ - CSVUtils.toChar("ab") - } - assert(exception.getMessage.contains("cannot be more than one character")) - } - - test("Throws exception for unsupported escaped characters") { - val exception = intercept[IllegalArgumentException]{ - CSVUtils.toChar("""\1""") - } - assert(exception.getMessage.contains("Unsupported special character for delimiter")) - } - - test("string with one backward slash is prohibited") { - val exception = intercept[IllegalArgumentException]{ - CSVUtils.toChar("""\""") - } - assert(exception.getMessage.contains("Single backslash is prohibited")) - } - - test("output proper error message for empty string") { - val exception = intercept[IllegalArgumentException]{ - CSVUtils.toChar("") - } - assert(exception.getMessage.contains("Delimiter cannot be empty string")) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/e9af9460/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala index 458edb2..6f23114 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.execution.datasources.csv import java.math.BigDecimal -import java.util.Locale import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org