This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 801d6a9 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 801d6a9 is described below commit 801d6a92d958f7b9762466e3c6643e54c48eb3a2 Author: Zhenhua Wang <wzh_...@163.com> AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang <wzh_...@163.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../sql/execution/datasources/csv/UnivocityParser.scala | 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index e847e40..5579e95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -82,7 +82,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) + val currentContent = tokenizer.getContext.currentParsedContent() + if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2ea8f4f..866d8de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1822,6 +1822,20 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { + val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) + val input = spark.createDataset(Seq("\u0000\u0000\u0001234")) + + checkAnswer( + spark.read + .option("columnNameOfCorruptRecord", "_corrupt_record") + .schema(schema) + .csv(input), + Row(null, null)) + assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org