Repository: spark
Updated Branches:
  refs/heads/master 63ca4bbe7 -> 76813cfa1


[SPARK-25950][SQL] from_csv should respect to 
spark.sql.columnNameOfCorruptRecord

## What changes were proposed in this pull request?

Fix for `CsvToStructs` to take into account SQL config 
`spark.sql.columnNameOfCorruptRecord` similar to `from_json`.

## How was this patch tested?

Added new test where `spark.sql.columnNameOfCorruptRecord` is set to corrupt 
column name different from default.

Closes #22956 from MaxGekk/csv-tests.

Authored-by: Maxim Gekk <max.g...@gmail.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76813cfa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76813cfa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76813cfa

Branch: refs/heads/master
Commit: 76813cfa1e2607ea3b669a79e59b568e96395b2e
Parents: 63ca4bb
Author: Maxim Gekk <max.g...@gmail.com>
Authored: Wed Nov 7 11:26:17 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Wed Nov 7 11:26:17 2018 +0800

----------------------------------------------------------------------
 .../catalyst/expressions/csvExpressions.scala   |  9 +++++-
 .../apache/spark/sql/CsvFunctionsSuite.scala    | 31 ++++++++++++++++++++
 2 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76813cfa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
index 74b670a..aff372b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.csv._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -92,8 +93,14 @@ case class CsvToStructs(
     }
   }
 
+  val nameOfCorruptRecord = 
SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
+
   @transient lazy val parser = {
-    val parsedOptions = new CSVOptions(options, columnPruning = true, 
timeZoneId.get)
+    val parsedOptions = new CSVOptions(
+      options,
+      columnPruning = true,
+      defaultTimeZoneId = timeZoneId.get,
+      defaultColumnNameOfCorruptRecord = nameOfCorruptRecord)
     val mode = parsedOptions.parseMode
     if (mode != PermissiveMode && mode != FailFastMode) {
       throw new AnalysisException(s"from_csv() doesn't support the 
${mode.name} mode. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/76813cfa/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index eb6b248..1dd8ec3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 
@@ -86,4 +88,33 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
     checkAnswer(df.select(to_csv($"a", options)), Row("26/08/2015 18:00") :: 
Nil)
   }
+
+  test("from_csv invalid csv - check modes") {
+    withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
+      val schema = new StructType()
+        .add("a", IntegerType)
+        .add("b", IntegerType)
+        .add("_unparsed", StringType)
+      val badRec = "\""
+      val df = Seq(badRec, "2,12").toDS()
+
+      checkAnswer(
+        df.select(from_csv($"value", schema, Map("mode" -> "PERMISSIVE"))),
+        Row(Row(null, null, badRec)) :: Row(Row(2, 12, null)) :: Nil)
+
+      val exception1 = intercept[SparkException] {
+        df.select(from_csv($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
+      }.getMessage
+      assert(exception1.contains(
+        "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+
+      val exception2 = intercept[SparkException] {
+        df.select(from_csv($"value", schema, Map("mode" -> "DROPMALFORMED")))
+          .collect()
+      }.getMessage
+      assert(exception2.contains(
+        "from_csv() doesn't support the DROPMALFORMED mode. " +
+          "Acceptable modes are PERMISSIVE and FAILFAST."))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to