[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22676


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223751038
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -330,7 +333,10 @@ private[csv] object UnivocityParser {
   def parseIterator(
   lines: Iterator[String],
   parser: UnivocityParser,
+  headerChecker: CSVHeaderChecker,
   schema: StructType): Iterator[InternalRow] = {
+headerChecker.checkHeaderColumnNames(lines, parser.tokenizer)
--- End diff --

ditto. It was already doing in this way. Let's keep the original path as is 
since it targets to organize it..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223749938
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -273,44 +273,47 @@ private[csv] object UnivocityParser {
   inputStream: InputStream,
   shouldDropHeader: Boolean,
   tokenizer: CsvParser): Iterator[Array[String]] = {
-convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => 
tokens)
+val handleHeader: () => Unit =
+  () => if (shouldDropHeader) tokenizer.parseNext
+
+convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
   }
 
   /**
* Parses a stream that contains CSV strings and turns it into an 
iterator of rows.
*/
   def parseStream(
   inputStream: InputStream,
-  shouldDropHeader: Boolean,
   parser: UnivocityParser,
-  schema: StructType,
-  checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
+  headerChecker: CSVHeaderChecker,
+  schema: StructType): Iterator[InternalRow] = {
 val tokenizer = parser.tokenizer
 val safeParser = new FailureSafeParser[Array[String]](
   input => Seq(parser.convert(input)),
   parser.options.parseMode,
   schema,
   parser.options.columnNameOfCorruptRecord,
   parser.options.multiLine)
-convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { 
tokens =>
+
+val handleHeader: () => Unit =
+  () => headerChecker.checkHeaderColumnNames(tokenizer)
+
+convertStream(inputStream, tokenizer, handleHeader) { tokens =>
   safeParser.parse(tokens)
 }.flatten
   }
 
   private def convertStream[T](
   inputStream: InputStream,
-  shouldDropHeader: Boolean,
   tokenizer: CsvParser,
-  checkHeader: Array[String] => Unit = _ => ())(
+  handleHeader: () => Unit)(
   convert: Array[String] => T) = new Iterator[T] {
 tokenizer.beginParsing(inputStream)
-private var nextRecord = {
-  if (shouldDropHeader) {
-val firstRecord = tokenizer.parseNext()
-checkHeader(firstRecord)
-  }
-  tokenizer.parseNext()
-}
+
+// We can handle header here since here the stream is open.
+handleHeader()
--- End diff --

It is but I guess it was already doing in this way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223749425
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema 
are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used 
in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is 
the start of the file.
+ *  if unknown or not applicable (for instance when 
the input is a dataset),
+ *  can be omitted.
+ */
+class CSVHeaderChecker(
--- End diff --

Let's leave as is. It's kind of existing naming convention within each 
datasource.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223748041
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 val actualSchema =
   StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
 
-val linesWithoutHeader = if (parsedOptions.headerFlag && 
maybeFirstLine.isDefined) {
-  val firstLine = maybeFirstLine.get
-  val parser = new CsvParser(parsedOptions.asParserSettings)
-  val columnNames = parser.parseLine(firstLine)
-  CSVDataSource.checkHeaderColumnNames(
+val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+  val headerChecker = new CSVHeaderChecker(
 actualSchema,
-columnNames,
-csvDataset.getClass.getCanonicalName,
-parsedOptions.enforceSchema,
-sparkSession.sessionState.conf.caseSensitiveAnalysis)
+parsedOptions,
+source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
+  headerChecker.checkHeaderColumnNames(firstLine)
   filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, 
firstLine, parsedOptions))
-} else {
-  filteredLines.rdd
-}
+}.getOrElse(filteredLines.rdd)
--- End diff --

I don't exactly remember. Looks we can change it to `Dataset`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223744025
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 val actualSchema =
   StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
 
-val linesWithoutHeader = if (parsedOptions.headerFlag && 
maybeFirstLine.isDefined) {
-  val firstLine = maybeFirstLine.get
-  val parser = new CsvParser(parsedOptions.asParserSettings)
-  val columnNames = parser.parseLine(firstLine)
-  CSVDataSource.checkHeaderColumnNames(
+val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+  val headerChecker = new CSVHeaderChecker(
 actualSchema,
-columnNames,
-csvDataset.getClass.getCanonicalName,
-parsedOptions.enforceSchema,
-sparkSession.sessionState.conf.caseSensitiveAnalysis)
+parsedOptions,
+source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
--- End diff --

Makes sense. If that's just `toString`, of course I can fix it here since 
the change is small although it's orthogonal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223743548
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -251,7 +125,7 @@ object TextInputCSVDataSource extends CSVDataSource {
 maybeFirstLine.map(csvParser.parseLine(_)) match {
   case Some(firstRow) if firstRow != null =>
 val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
-val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, 
parsedOptions)
--- End diff --

Because mostly in this codes use `CSVUtils...` one. I just followed it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223741059
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -330,7 +333,10 @@ private[csv] object UnivocityParser {
   def parseIterator(
   lines: Iterator[String],
   parser: UnivocityParser,
+  headerChecker: CSVHeaderChecker,
   schema: StructType): Iterator[InternalRow] = {
+headerChecker.checkHeaderColumnNames(lines, parser.tokenizer)
--- End diff --

The same question here. I would prefer to consume the input iterator 
lazily. This is the one of advantage of iterators , it performs an action when 
you explicitly call it (`hasNext` or `next`) comparing to collections, for 
example.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223730392
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 ---
@@ -139,14 +138,15 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
 StructType(dataSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord)),
 StructType(requiredSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord)),
 parsedOptions)
+  val schema = if (columnPruning) requiredSchema else dataSchema
+  val headerChecker = new CSVHeaderChecker(
+schema, parsedOptions, source = s"CSV file: ${file.filePath}", 
file.start == 0)
--- End diff --

`isStartOfFile = file.start == 0`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223737261
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -273,44 +273,47 @@ private[csv] object UnivocityParser {
   inputStream: InputStream,
   shouldDropHeader: Boolean,
   tokenizer: CsvParser): Iterator[Array[String]] = {
-convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => 
tokens)
+val handleHeader: () => Unit =
+  () => if (shouldDropHeader) tokenizer.parseNext
+
+convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
   }
 
   /**
* Parses a stream that contains CSV strings and turns it into an 
iterator of rows.
*/
   def parseStream(
   inputStream: InputStream,
-  shouldDropHeader: Boolean,
   parser: UnivocityParser,
-  schema: StructType,
-  checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
+  headerChecker: CSVHeaderChecker,
+  schema: StructType): Iterator[InternalRow] = {
 val tokenizer = parser.tokenizer
 val safeParser = new FailureSafeParser[Array[String]](
   input => Seq(parser.convert(input)),
   parser.options.parseMode,
   schema,
   parser.options.columnNameOfCorruptRecord,
   parser.options.multiLine)
-convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { 
tokens =>
+
+val handleHeader: () => Unit =
+  () => headerChecker.checkHeaderColumnNames(tokenizer)
+
+convertStream(inputStream, tokenizer, handleHeader) { tokens =>
   safeParser.parse(tokens)
 }.flatten
   }
 
   private def convertStream[T](
   inputStream: InputStream,
-  shouldDropHeader: Boolean,
   tokenizer: CsvParser,
-  checkHeader: Array[String] => Unit = _ => ())(
+  handleHeader: () => Unit)(
   convert: Array[String] => T) = new Iterator[T] {
 tokenizer.beginParsing(inputStream)
-private var nextRecord = {
-  if (shouldDropHeader) {
-val firstRecord = tokenizer.parseNext()
-checkHeader(firstRecord)
-  }
-  tokenizer.parseNext()
-}
+
+// We can handle header here since here the stream is open.
+handleHeader()
--- End diff --

It looks slightly strange that we consume data from the input before the 
upper layer starts reading it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223727251
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 val actualSchema =
   StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
 
-val linesWithoutHeader = if (parsedOptions.headerFlag && 
maybeFirstLine.isDefined) {
-  val firstLine = maybeFirstLine.get
-  val parser = new CsvParser(parsedOptions.asParserSettings)
-  val columnNames = parser.parseLine(firstLine)
-  CSVDataSource.checkHeaderColumnNames(
+val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+  val headerChecker = new CSVHeaderChecker(
 actualSchema,
-columnNames,
-csvDataset.getClass.getCanonicalName,
-parsedOptions.enforceSchema,
-sparkSession.sessionState.conf.caseSensitiveAnalysis)
+parsedOptions,
+source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
+  headerChecker.checkHeaderColumnNames(firstLine)
   filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, 
firstLine, parsedOptions))
-} else {
-  filteredLines.rdd
-}
+}.getOrElse(filteredLines.rdd)
--- End diff --

It is not directly related to your changes. Just in case, why do we convert 
`Dataset` to `RDD` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223729530
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema 
are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used 
in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is 
the start of the file.
+ *  if unknown or not applicable (for instance when 
the input is a dataset),
+ *  can be omitted.
+ */
+class CSVHeaderChecker(
--- End diff --

Is this prefix of `CSVHeaderChecker` necessary? The class is in `csv` 
package already. It should be clear that it checks CSV headers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223741951
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -251,7 +125,7 @@ object TextInputCSVDataSource extends CSVDataSource {
 maybeFirstLine.map(csvParser.parseLine(_)) match {
   case Some(firstRow) if firstRow != null =>
 val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
-val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, 
parsedOptions)
--- End diff --

What about to import it from `CSVUtils`? What is the reason to have the 
prefix here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223698787
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -505,20 +505,14 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 val actualSchema =
   StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
 
-val linesWithoutHeader = if (parsedOptions.headerFlag && 
maybeFirstLine.isDefined) {
-  val firstLine = maybeFirstLine.get
-  val parser = new CsvParser(parsedOptions.asParserSettings)
-  val columnNames = parser.parseLine(firstLine)
-  CSVDataSource.checkHeaderColumnNames(
+val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+  val headerChecker = new CSVHeaderChecker(
 actualSchema,
-columnNames,
-csvDataset.getClass.getCanonicalName,
-parsedOptions.enforceSchema,
-sparkSession.sessionState.conf.caseSensitiveAnalysis)
+parsedOptions,
+source = s"CSV source: ${csvDataset.getClass.getCanonicalName}")
--- End diff --

Is it better to output more concrete info about the dataset. For example, 
`toString` outputs field names at least. I think it will help in log analysis.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223728765
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema 
are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used 
in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is 
the start of the file.
+ *  if unknown or not applicable (for instance when 
the input is a dataset),
+ *  can be omitted.
+ */
+class CSVHeaderChecker(
--- End diff --

It's under execution package which is meant to be private. Since it's 
accessed in DataFrameReader, it should be `private[sql]` which is removed in 
SPARK-16964 for this reason.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223722902
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema 
are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used 
in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is 
the start of the file.
+ *  if unknown or not applicable (for instance when 
the input is a dataset),
+ *  can be omitted.
+ */
+class CSVHeaderChecker(
--- End diff --

Can this be private to csv or spark packages? or is this now part of a 
public API? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223594430
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema 
are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used 
in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is 
the start of the file.
+ *  if unknown or not applicable (for instance when 
the input is a dataset),
+ *  can be omitted.
+ */
+class CSVHeaderChecker(
+schema: StructType,
+options: CSVOptions,
+source: String,
+isStartOfFile: Boolean = false) extends Logging {
+
+  // Indicates if it is set to `false`, comparison of column names and 
schema field
+  // names is not case sensitive.
+  private val caseSensitive = SQLConf.get.caseSensitiveAnalysis
+
+  // Indicates if it is `true`, column names are ignored otherwise the CSV 
column
+  // names are checked for conformance to the schema. In the case if
+  // the column name don't conform to the schema, an exception is thrown.
+  private val enforceSchema = options.enforceSchema
+
+  /**
+   * Checks that column names in a CSV header and field names in the 
schema are the same
+   * by taking into account case sensitivity.
+   *
+   * @param columnNames names of CSV columns that must be checked against 
to the schema.
+   */
+  private def checkHeaderColumnNames(columnNames: Array[String]): Unit = {
+if (columnNames != null) {
+  val fieldNames = schema.map(_.name).toIndexedSeq
+  val (headerLen, schemaSize) = (columnNames.size, fieldNames.length)
+  var errorMessage: Option[String] = None
+
+  if (headerLen == schemaSize) {
+var i = 0
+while (errorMessage.isEmpty && i < headerLen) {
+  var (nameInSchema, nameInHeader) = (fieldNames(i), 
columnNames(i))
+  if (!caseSensitive) {
+// scalastyle:off caselocale
+nameInSchema = nameInSchema.toLowerCase
+nameInHeader = nameInHeader.toLowerCase
+// scalastyle:on caselocale
+  }
+  if (nameInHeader != nameInSchema) {
+errorMessage = Some(
+  s"""|CSV header does not conform to the schema.
+  | Header: ${columnNames.mkString(", ")}
+  | Schema: ${fieldNames.mkString(", ")}
+  |Expected: ${fieldNames(i)} but found: ${columnNames(i)}
+  |$source""".stripMargin)
--- End diff --

only this diff. 

Previously it was 

```
 |CSV file: $fileName""".stripMargin)
```

which ends up with producing the class of source here. See 
(https://github.com/apache/spark/pull/22676/files#diff-f70bda59304588cc3abfa3a9840653f4R512)

This is only the diff in this method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223594011
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVHeaderChecker.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import com.univocity.parsers.csv.CsvParser
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Checks that column names in a CSV header and field names in the schema 
are the same
+ * by taking into account case sensitivity.
+ *
+ * @param schema provided (or inferred) schema to which CSV must conform.
+ * @param options parsed CSV options.
+ * @param source name of CSV source that are currently checked. It is used 
in error messages.
+ * @param isStartOfFile indicates if the currently processing partition is 
the start of the file.
+ *  if unknown or not applicable (for instance when 
the input is a dataset),
+ *  can be omitted.
+ */
+class CSVHeaderChecker(
+schema: StructType,
+options: CSVOptions,
+source: String,
+isStartOfFile: Boolean = false) extends Logging {
+
+  // Indicates if it is set to `false`, comparison of column names and 
schema field
+  // names is not case sensitive.
+  private val caseSensitive = SQLConf.get.caseSensitiveAnalysis
+
+  // Indicates if it is `true`, column names are ignored otherwise the CSV 
column
+  // names are checked for conformance to the schema. In the case if
+  // the column name don't conform to the schema, an exception is thrown.
+  private val enforceSchema = options.enforceSchema
+
+  /**
+   * Checks that column names in a CSV header and field names in the 
schema are the same
+   * by taking into account case sensitivity.
+   *
+   * @param columnNames names of CSV columns that must be checked against 
to the schema.
+   */
+  private def checkHeaderColumnNames(columnNames: Array[String]): Unit = {
--- End diff --

It's moved as was except the parameters at its signature.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223593838
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -273,44 +274,47 @@ private[csv] object UnivocityParser {
   inputStream: InputStream,
   shouldDropHeader: Boolean,
   tokenizer: CsvParser): Iterator[Array[String]] = {
-convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => 
tokens)
+val handleHeader: () => Unit =
+  () => if (shouldDropHeader) tokenizer.parseNext
+
+convertStream(inputStream, tokenizer, handleHeader)(tokens => tokens)
   }
 
   /**
* Parses a stream that contains CSV strings and turns it into an 
iterator of rows.
*/
   def parseStream(
   inputStream: InputStream,
-  shouldDropHeader: Boolean,
   parser: UnivocityParser,
-  schema: StructType,
-  checkHeader: Array[String] => Unit): Iterator[InternalRow] = {
+  headerChecker: CSVHeaderChecker,
+  schema: StructType): Iterator[InternalRow] = {
 val tokenizer = parser.tokenizer
 val safeParser = new FailureSafeParser[Array[String]](
   input => Seq(parser.convert(input)),
   parser.options.parseMode,
   schema,
   parser.options.columnNameOfCorruptRecord,
   parser.options.multiLine)
-convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { 
tokens =>
+
+val handleHeader: () => Unit =
+  () => headerChecker.checkHeaderColumnNames(tokenizer)
--- End diff --

This matches the code structure with `parseStream` and `parseIterator` 
which are used in multimode and non-multimode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223593894
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
 ---
@@ -90,6 +89,49 @@ object CSVUtils {
   None
 }
   }
+
+  /**
+   * Generates a header from the given row which is null-safe and 
duplicate-safe.
+   */
+  def makeSafeHeader(
--- End diff --

It's moved as was.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22676#discussion_r223593701
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 ---
@@ -273,44 +274,47 @@ private[csv] object UnivocityParser {
   inputStream: InputStream,
   shouldDropHeader: Boolean,
   tokenizer: CsvParser): Iterator[Array[String]] = {
-convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => 
tokens)
+val handleHeader: () => Unit =
+  () => if (shouldDropHeader) tokenizer.parseNext
--- End diff --

This is used in schema inference path, where we don't check header. Here 
only it drops  the header.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22676: [SPARK-25684][SQL] Organize header related codes ...

2018-10-09 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/22676

[SPARK-25684][SQL] Organize header related codes in CSV datasource

## What changes were proposed in this pull request?

1. Move `CSVDataSource.makeSafeHeader` to `CSVUtils.makeSafeHeader` (as is).
  Rationale:
- Historically and at the first place of refactoring (which I did), I 
intended to put all CSV specific handling (like options), filtering, extracting 
header, etc.
- See `JsonDataSource`. Now `CSVDataSource` is quite consistent with 
`JsonDataSource`. Since CSV's code path is quite complicated, we might better 
match them as possible as we can.

2. Move `CSVDataSource.checkHeaderColumnNames` to 
`CSVHeaderChecker.checkHeaderColumnNames` (as is).
Rationale:
  - Similar reasons above with 1.

3. Put `enforceSchema` logics into `CSVHeaderChecker`.
  - The checking header and column pruning stuff were added (per 
https://github.com/apache/spark/pull/20894 and 
https://github.com/apache/spark/pull/21296) but some of codes such as 
https://github.com/apache/spark/pull/21296 are duplicated
  - Also, checking header code is basically here and there. We better 
put them in a single place, which is quite error-prone. See 
(https://github.com/apache/spark/pull/22656).

## How was this patch tested?

Existing tests should cover this.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark refactoring-csv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22676.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22676


commit 56906680ab7d5d63be04bac2c3a19bb52baa3025
Author: hyukjinkwon 
Date:   2018-10-09T07:26:08Z

Organize header related codes in CSV datasource




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org