Repository: spark Updated Branches: refs/heads/master 63ca4bbe7 -> 76813cfa1
[SPARK-25950][SQL] from_csv should respect to spark.sql.columnNameOfCorruptRecord ## What changes were proposed in this pull request? Fix for `CsvToStructs` to take into account SQL config `spark.sql.columnNameOfCorruptRecord` similar to `from_json`. ## How was this patch tested? Added new test where `spark.sql.columnNameOfCorruptRecord` is set to corrupt column name different from default. Closes #22956 from MaxGekk/csv-tests. Authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76813cfa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76813cfa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76813cfa Branch: refs/heads/master Commit: 76813cfa1e2607ea3b669a79e59b568e96395b2e Parents: 63ca4bb Author: Maxim Gekk <max.g...@gmail.com> Authored: Wed Nov 7 11:26:17 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Wed Nov 7 11:26:17 2018 +0800 ---------------------------------------------------------------------- .../catalyst/expressions/csvExpressions.scala | 9 +++++- .../apache/spark/sql/CsvFunctionsSuite.scala | 31 ++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/76813cfa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 74b670a..aff372b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.csv._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -92,8 +93,14 @@ case class CsvToStructs( } } + val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD) + @transient lazy val parser = { - val parsedOptions = new CSVOptions(options, columnPruning = true, timeZoneId.get) + val parsedOptions = new CSVOptions( + options, + columnPruning = true, + defaultTimeZoneId = timeZoneId.get, + defaultColumnNameOfCorruptRecord = nameOfCorruptRecord) val mode = parsedOptions.parseMode if (mode != PermissiveMode && mode != FailFastMode) { throw new AnalysisException(s"from_csv() doesn't support the ${mode.name} mode. " + http://git-wip-us.apache.org/repos/asf/spark/blob/76813cfa/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index eb6b248..1dd8ec3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql import scala.collection.JavaConverters._ +import org.apache.spark.SparkException import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -86,4 +88,33 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(df.select(to_csv($"a", options)), Row("26/08/2015 18:00") :: Nil) } + + test("from_csv invalid csv - check modes") { + withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { + val schema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("_unparsed", StringType) + val badRec = "\"" + val df = Seq(badRec, "2,12").toDS() + + checkAnswer( + df.select(from_csv($"value", schema, Map("mode" -> "PERMISSIVE"))), + Row(Row(null, null, badRec)) :: Row(Row(2, 12, null)) :: Nil) + + val exception1 = intercept[SparkException] { + df.select(from_csv($"value", schema, Map("mode" -> "FAILFAST"))).collect() + }.getMessage + assert(exception1.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + + val exception2 = intercept[SparkException] { + df.select(from_csv($"value", schema, Map("mode" -> "DROPMALFORMED"))) + .collect() + }.getMessage + assert(exception2.contains( + "from_csv() doesn't support the DROPMALFORMED mode. " + + "Acceptable modes are PERMISSIVE and FAILFAST.")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org