[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22676 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223751038 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -330,7 +333,10 @@ private[csv] object UnivocityParser { def parseIterator( lines: Iterator[String], parser: UnivocityParser, + headerChecker: CSVHeaderChecker, schema: StructType): Iterator[InternalRow] = { +headerChecker.checkHeaderColumnNames(lines, parser.tokenizer) --- End diff -- ditto. It was already doing in this way. Let's keep the original path as is since it targets to organize it.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223749938 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -273,44 +273,47 @@ private[csv] object UnivocityParser { inputStream: InputStream, shouldDropHeader: Boolean, tokenizer: CsvParser): Iterator[Array[String]] = { -convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens) +val handleHeader: () => Unit = + () => if (shouldDropHeader) tokenizer.parseNext + +convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens) } /** * Parses a stream that contains CSV strings and turns it into an iterator of rows. */ def parseStream( inputStream: InputStream, - shouldDropHeader: Boolean, parser: UnivocityParser, - schema: StructType, - checkHeader: Array[String] => Unit): Iterator[InternalRow] = { + headerChecker: CSVHeaderChecker, + schema: StructType): Iterator[InternalRow] = { val tokenizer = parser.tokenizer val safeParser = new FailureSafeParser[Array[String]]( input => Seq(parser.convert(input)), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord, parser.options.multiLine) -convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens => + +val handleHeader: () => Unit = + () => headerChecker.checkHeaderColumnNames(tokenizer) + +convertStream(inputStream, tokenizer, handleHeader) { tokens => safeParser.parse(tokens) }.flatten } private def convertStream[T]( inputStream: InputStream, - shouldDropHeader: Boolean, tokenizer: CsvParser, - checkHeader: Array[String] => Unit = _ => ())( + handleHeader: () => Unit)( convert: Array[String] => T) = new Iterator[T] { tokenizer.beginParsing(inputStream) -private var nextRecord = { - if (shouldDropHeader) { -val firstRecord = tokenizer.parseNext() -checkHeader(firstRecord) - } - tokenizer.parseNext() -} + +// We can handle header here since here the stream is open. +handleHeader() --- End diff -- It is but I guess it was already doing in this way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223749425 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import com.univocity.parsers.csv.CsvParser + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param schema provided (or inferred) schema to which CSV must conform. + * @param options parsed CSV options. + * @param source name of CSV source that are currently checked. It is used in error messages. + * @param isStartOfFile indicates if the currently processing partition is the start of the file. + * if unknown or not applicable (for instance when the input is a dataset), + * can be omitted. + */ +class CSVHeaderChecker( --- End diff -- Let's leave as is. It's kind of existing naming convention within each datasource. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223748041 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val actualSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) -val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) { - val firstLine = maybeFirstLine.get - val parser = new CsvParser(parsedOptions.asParserSettings) - val columnNames = parser.parseLine(firstLine) - CSVDataSource.checkHeaderColumnNames( +val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine => + val headerChecker = new CSVHeaderChecker( actualSchema, -columnNames, -csvDataset.getClass.getCanonicalName, -parsedOptions.enforceSchema, -sparkSession.sessionState.conf.caseSensitiveAnalysis) +parsedOptions, +source = s"CSV source: ${csvDataset.getClass.getCanonicalName}") + headerChecker.checkHeaderColumnNames(firstLine) filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions)) -} else { - filteredLines.rdd -} +}.getOrElse(filteredLines.rdd) --- End diff -- I don't exactly remember. Looks we can change it to `Dataset`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223744025 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val actualSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) -val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) { - val firstLine = maybeFirstLine.get - val parser = new CsvParser(parsedOptions.asParserSettings) - val columnNames = parser.parseLine(firstLine) - CSVDataSource.checkHeaderColumnNames( +val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine => + val headerChecker = new CSVHeaderChecker( actualSchema, -columnNames, -csvDataset.getClass.getCanonicalName, -parsedOptions.enforceSchema, -sparkSession.sessionState.conf.caseSensitiveAnalysis) +parsedOptions, +source = s"CSV source: ${csvDataset.getClass.getCanonicalName}") --- End diff -- Makes sense. If that's just `toString`, of course I can fix it here since the change is small although it's orthogonal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223743548 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -251,7 +125,7 @@ object TextInputCSVDataSource extends CSVDataSource { maybeFirstLine.map(csvParser.parseLine(_)) match { case Some(firstRow) if firstRow != null => val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis -val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) +val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions) --- End diff -- Because mostly in this codes use `CSVUtils...` one. I just followed it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223729530 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import com.univocity.parsers.csv.CsvParser + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param schema provided (or inferred) schema to which CSV must conform. + * @param options parsed CSV options. + * @param source name of CSV source that are currently checked. It is used in error messages. + * @param isStartOfFile indicates if the currently processing partition is the start of the file. + * if unknown or not applicable (for instance when the input is a dataset), + * can be omitted. + */ +class CSVHeaderChecker( --- End diff -- Is this prefix of `CSVHeaderChecker` necessary? The class is in `csv` package already. It should be clear that it checks CSV headers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223741059 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -330,7 +333,10 @@ private[csv] object UnivocityParser { def parseIterator( lines: Iterator[String], parser: UnivocityParser, + headerChecker: CSVHeaderChecker, schema: StructType): Iterator[InternalRow] = { +headerChecker.checkHeaderColumnNames(lines, parser.tokenizer) --- End diff -- The same question here. I would prefer to consume the input iterator lazily. This is the one of advantage of iterators , it performs an action when you explicitly call it (`hasNext` or `next`) comparing to collections, for example. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223730392 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala --- @@ -139,14 +138,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), parsedOptions) + val schema = if (columnPruning) requiredSchema else dataSchema + val headerChecker = new CSVHeaderChecker( +schema, parsedOptions, source = s"CSV file: ${file.filePath}", file.start == 0) --- End diff -- `isStartOfFile = file.start == 0` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223737261 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -273,44 +273,47 @@ private[csv] object UnivocityParser { inputStream: InputStream, shouldDropHeader: Boolean, tokenizer: CsvParser): Iterator[Array[String]] = { -convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens) +val handleHeader: () => Unit = + () => if (shouldDropHeader) tokenizer.parseNext + +convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens) } /** * Parses a stream that contains CSV strings and turns it into an iterator of rows. */ def parseStream( inputStream: InputStream, - shouldDropHeader: Boolean, parser: UnivocityParser, - schema: StructType, - checkHeader: Array[String] => Unit): Iterator[InternalRow] = { + headerChecker: CSVHeaderChecker, + schema: StructType): Iterator[InternalRow] = { val tokenizer = parser.tokenizer val safeParser = new FailureSafeParser[Array[String]]( input => Seq(parser.convert(input)), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord, parser.options.multiLine) -convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens => + +val handleHeader: () => Unit = + () => headerChecker.checkHeaderColumnNames(tokenizer) + +convertStream(inputStream, tokenizer, handleHeader) { tokens => safeParser.parse(tokens) }.flatten } private def convertStream[T]( inputStream: InputStream, - shouldDropHeader: Boolean, tokenizer: CsvParser, - checkHeader: Array[String] => Unit = _ => ())( + handleHeader: () => Unit)( convert: Array[String] => T) = new Iterator[T] { tokenizer.beginParsing(inputStream) -private var nextRecord = { - if (shouldDropHeader) { -val firstRecord = tokenizer.parseNext() -checkHeader(firstRecord) - } - tokenizer.parseNext() -} + +// We can handle header here since here the stream is open. +handleHeader() --- End diff -- It looks slightly strange that we consume data from the input before the upper layer starts reading it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223727251 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val actualSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) -val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) { - val firstLine = maybeFirstLine.get - val parser = new CsvParser(parsedOptions.asParserSettings) - val columnNames = parser.parseLine(firstLine) - CSVDataSource.checkHeaderColumnNames( +val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine => + val headerChecker = new CSVHeaderChecker( actualSchema, -columnNames, -csvDataset.getClass.getCanonicalName, -parsedOptions.enforceSchema, -sparkSession.sessionState.conf.caseSensitiveAnalysis) +parsedOptions, +source = s"CSV source: ${csvDataset.getClass.getCanonicalName}") + headerChecker.checkHeaderColumnNames(firstLine) filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions)) -} else { - filteredLines.rdd -} +}.getOrElse(filteredLines.rdd) --- End diff -- It is not directly related to your changes. Just in case, why do we convert `Dataset` to `RDD` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223741951 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -251,7 +125,7 @@ object TextInputCSVDataSource extends CSVDataSource { maybeFirstLine.map(csvParser.parseLine(_)) match { case Some(firstRow) if firstRow != null => val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis -val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) +val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions) --- End diff -- What about to import it from `CSVUtils`? What is the reason to have the prefix here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223698787 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val actualSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) -val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) { - val firstLine = maybeFirstLine.get - val parser = new CsvParser(parsedOptions.asParserSettings) - val columnNames = parser.parseLine(firstLine) - CSVDataSource.checkHeaderColumnNames( +val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine => + val headerChecker = new CSVHeaderChecker( actualSchema, -columnNames, -csvDataset.getClass.getCanonicalName, -parsedOptions.enforceSchema, -sparkSession.sessionState.conf.caseSensitiveAnalysis) +parsedOptions, +source = s"CSV source: ${csvDataset.getClass.getCanonicalName}") --- End diff -- Is it better to output more concrete info about the dataset. For example, `toString` outputs field names at least. I think it will help in log analysis. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223728765 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import com.univocity.parsers.csv.CsvParser + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param schema provided (or inferred) schema to which CSV must conform. + * @param options parsed CSV options. + * @param source name of CSV source that are currently checked. It is used in error messages. + * @param isStartOfFile indicates if the currently processing partition is the start of the file. + * if unknown or not applicable (for instance when the input is a dataset), + * can be omitted. + */ +class CSVHeaderChecker( --- End diff -- It's under execution package which is meant to be private. Since it's accessed in DataFrameReader, it should be `private[sql]` which is removed in SPARK-16964 for this reason. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223722902 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import com.univocity.parsers.csv.CsvParser + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param schema provided (or inferred) schema to which CSV must conform. + * @param options parsed CSV options. + * @param source name of CSV source that are currently checked. It is used in error messages. + * @param isStartOfFile indicates if the currently processing partition is the start of the file. + * if unknown or not applicable (for instance when the input is a dataset), + * can be omitted. + */ +class CSVHeaderChecker( --- End diff -- Can this be private to csv or spark packages? or is this now part of a public API? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223594430 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import com.univocity.parsers.csv.CsvParser + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param schema provided (or inferred) schema to which CSV must conform. + * @param options parsed CSV options. + * @param source name of CSV source that are currently checked. It is used in error messages. + * @param isStartOfFile indicates if the currently processing partition is the start of the file. + * if unknown or not applicable (for instance when the input is a dataset), + * can be omitted. + */ +class CSVHeaderChecker( +schema: StructType, +options: CSVOptions, +source: String, +isStartOfFile: Boolean = false) extends Logging { + + // Indicates if it is set to `false`, comparison of column names and schema field + // names is not case sensitive. + private val caseSensitive = SQLConf.get.caseSensitiveAnalysis + + // Indicates if it is `true`, column names are ignored otherwise the CSV column + // names are checked for conformance to the schema. In the case if + // the column name don't conform to the schema, an exception is thrown. + private val enforceSchema = options.enforceSchema + + /** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param columnNames names of CSV columns that must be checked against to the schema. + */ + private def checkHeaderColumnNames(columnNames: Array[String]): Unit = { +if (columnNames != null) { + val fieldNames = schema.map(_.name).toIndexedSeq + val (headerLen, schemaSize) = (columnNames.size, fieldNames.length) + var errorMessage: Option[String] = None + + if (headerLen == schemaSize) { +var i = 0 +while (errorMessage.isEmpty && i < headerLen) { + var (nameInSchema, nameInHeader) = (fieldNames(i), columnNames(i)) + if (!caseSensitive) { +// scalastyle:off caselocale +nameInSchema = nameInSchema.toLowerCase +nameInHeader = nameInHeader.toLowerCase +// scalastyle:on caselocale + } + if (nameInHeader != nameInSchema) { +errorMessage = Some( + s"""|CSV header does not conform to the schema. + | Header: ${columnNames.mkString(", ")} + | Schema: ${fieldNames.mkString(", ")} + |Expected: ${fieldNames(i)} but found: ${columnNames(i)} + |$source""".stripMargin) --- End diff -- only this diff. Previously it was ``` |CSV file: $fileName""".stripMargin) ``` which ends up with producing the class of source here. See (https://github.com/apache/spark/pull/22676/files#diff-f70bda59304588cc3abfa3a9840653f4R512) This is only the diff in this method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223594011 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import com.univocity.parsers.csv.CsvParser + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param schema provided (or inferred) schema to which CSV must conform. + * @param options parsed CSV options. + * @param source name of CSV source that are currently checked. It is used in error messages. + * @param isStartOfFile indicates if the currently processing partition is the start of the file. + * if unknown or not applicable (for instance when the input is a dataset), + * can be omitted. + */ +class CSVHeaderChecker( +schema: StructType, +options: CSVOptions, +source: String, +isStartOfFile: Boolean = false) extends Logging { + + // Indicates if it is set to `false`, comparison of column names and schema field + // names is not case sensitive. + private val caseSensitive = SQLConf.get.caseSensitiveAnalysis + + // Indicates if it is `true`, column names are ignored otherwise the CSV column + // names are checked for conformance to the schema. In the case if + // the column name don't conform to the schema, an exception is thrown. + private val enforceSchema = options.enforceSchema + + /** + * Checks that column names in a CSV header and field names in the schema are the same + * by taking into account case sensitivity. + * + * @param columnNames names of CSV columns that must be checked against to the schema. + */ + private def checkHeaderColumnNames(columnNames: Array[String]): Unit = { --- End diff -- It's moved as was except the parameters at its signature. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223593838 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -273,44 +274,47 @@ private[csv] object UnivocityParser { inputStream: InputStream, shouldDropHeader: Boolean, tokenizer: CsvParser): Iterator[Array[String]] = { -convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens) +val handleHeader: () => Unit = + () => if (shouldDropHeader) tokenizer.parseNext + +convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens) } /** * Parses a stream that contains CSV strings and turns it into an iterator of rows. */ def parseStream( inputStream: InputStream, - shouldDropHeader: Boolean, parser: UnivocityParser, - schema: StructType, - checkHeader: Array[String] => Unit): Iterator[InternalRow] = { + headerChecker: CSVHeaderChecker, + schema: StructType): Iterator[InternalRow] = { val tokenizer = parser.tokenizer val safeParser = new FailureSafeParser[Array[String]]( input => Seq(parser.convert(input)), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord, parser.options.multiLine) -convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens => + +val handleHeader: () => Unit = + () => headerChecker.checkHeaderColumnNames(tokenizer) --- End diff -- This matches the code structure with `parseStream` and `parseIterator` which are used in multimode and non-multimode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223593894 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala --- @@ -90,6 +89,49 @@ object CSVUtils { None } } + + /** + * Generates a header from the given row which is null-safe and duplicate-safe. + */ + def makeSafeHeader( --- End diff -- It's moved as was. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22676#discussion_r223593701 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala --- @@ -273,44 +274,47 @@ private[csv] object UnivocityParser { inputStream: InputStream, shouldDropHeader: Boolean, tokenizer: CsvParser): Iterator[Array[String]] = { -convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens) +val handleHeader: () => Unit = + () => if (shouldDropHeader) tokenizer.parseNext --- End diff -- This is used in schema inference path, where we don't check header. Here only it drops the header. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/22676 [SPARK-25684][SQL] Organize header related codes in CSV datasource ## What changes were proposed in this pull request? 1. Move `CSVDataSource.makeSafeHeader` to `CSVUtils.makeSafeHeader` (as is). Rationale: - Historically and at the first place of refactoring (which I did), I intended to put all CSV specific handling (like options), filtering, extracting header, etc. - See `JsonDataSource`. Now `CSVDataSource` is quite consistent with `JsonDataSource`. Since CSV's code path is quite complicated, we might better match them as possible as we can. 2. Move `CSVDataSource.checkHeaderColumnNames` to `CSVHeaderChecker.checkHeaderColumnNames` (as is). Rationale: - Similar reasons above with 1. 3. Put `enforceSchema` logics into `CSVHeaderChecker`. - The checking header and column pruning stuff were added (per https://github.com/apache/spark/pull/20894 and https://github.com/apache/spark/pull/21296) but some of codes such as https://github.com/apache/spark/pull/21296 are duplicated - Also, checking header code is basically here and there. We better put them in a single place, which is quite error-prone. See (https://github.com/apache/spark/pull/22656). ## How was this patch tested? Existing tests should cover this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark refactoring-csv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22676.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22676 commit 56906680ab7d5d63be04bac2c3a19bb52baa3025 Author: hyukjinkwon Date: 2018-10-09T07:26:08Z Organize header related codes in CSV datasource --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org