[spark] branch branch-3.0 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified

2020-03-28 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 38c262b  [SPARK-31261][SQL] Avoid npe when reading bad csv input with 
`columnNameCorruptRecord` specified
38c262b is described below

commit 38c262b97e4fa10b249d51d3f69a0c97760492a3
Author: Zhenhua Wang 
AuthorDate: Sun Mar 29 13:30:14 2020 +0900

[SPARK-31261][SQL] Avoid npe when reading bad csv input with 
`columnNameCorruptRecord` specified

### What changes were proposed in this pull request?

SPARK-25387 avoids npe for bad csv input, but when reading bad csv input 
with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it 
still throws npe.

### Why are the changes needed?

Bug fix.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Add a test.

Closes #28029 from wzhfy/corrupt_column_npe.

Authored-by: Zhenhua Wang 
Signed-off-by: HyukjinKwon 
(cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732)
Signed-off-by: HyukjinKwon 
---
 .../apache/spark/sql/catalyst/csv/UnivocityParser.scala|  3 ++-
 .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 2c5a9d7..8e87a827 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -101,7 +101,8 @@ class UnivocityParser(
 
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
-
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
+val currentContent = tokenizer.getContext.currentParsedContent()
+if (currentContent == null) null else 
UTF8String.fromString(currentContent.stripLineEnd)
   }
 
   // This parser first picks some tokens from the input tokens, according to 
the required schema,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index f9a510d..366cf11 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
 assert(spark.read.csv(input).collect().toSet == Set(Row()))
   }
 
+  test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not 
cause NPE") {
+val schema = StructType(
+  StructField("a", IntegerType) :: StructField("_corrupt_record", 
StringType) :: Nil)
+val input = spark.createDataset(Seq("\u\u\u0001234"))
+
+checkAnswer(
+  spark.read
+.option("columnNameOfCorruptRecord", "_corrupt_record")
+.schema(schema)
+.csv(input),
+  Row(null, null))
+assert(spark.read.csv(input).collect().toSet == Set(Row()))
+  }
+
   test("field names of inferred schema shouldn't compare to the first row") {
 val input = Seq("1,2").toDS()
 val df = spark.read.option("enforceSchema", false).csv(input)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified

2020-03-28 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 38c262b  [SPARK-31261][SQL] Avoid npe when reading bad csv input with 
`columnNameCorruptRecord` specified
38c262b is described below

commit 38c262b97e4fa10b249d51d3f69a0c97760492a3
Author: Zhenhua Wang 
AuthorDate: Sun Mar 29 13:30:14 2020 +0900

[SPARK-31261][SQL] Avoid npe when reading bad csv input with 
`columnNameCorruptRecord` specified

### What changes were proposed in this pull request?

SPARK-25387 avoids npe for bad csv input, but when reading bad csv input 
with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it 
still throws npe.

### Why are the changes needed?

Bug fix.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Add a test.

Closes #28029 from wzhfy/corrupt_column_npe.

Authored-by: Zhenhua Wang 
Signed-off-by: HyukjinKwon 
(cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732)
Signed-off-by: HyukjinKwon 
---
 .../apache/spark/sql/catalyst/csv/UnivocityParser.scala|  3 ++-
 .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 2c5a9d7..8e87a827 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -101,7 +101,8 @@ class UnivocityParser(
 
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
-
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
+val currentContent = tokenizer.getContext.currentParsedContent()
+if (currentContent == null) null else 
UTF8String.fromString(currentContent.stripLineEnd)
   }
 
   // This parser first picks some tokens from the input tokens, according to 
the required schema,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index f9a510d..366cf11 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
 assert(spark.read.csv(input).collect().toSet == Set(Row()))
   }
 
+  test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not 
cause NPE") {
+val schema = StructType(
+  StructField("a", IntegerType) :: StructField("_corrupt_record", 
StringType) :: Nil)
+val input = spark.createDataset(Seq("\u\u\u0001234"))
+
+checkAnswer(
+  spark.read
+.option("columnNameOfCorruptRecord", "_corrupt_record")
+.schema(schema)
+.csv(input),
+  Row(null, null))
+assert(spark.read.csv(input).collect().toSet == Set(Row()))
+  }
+
   test("field names of inferred schema shouldn't compare to the first row") {
 val input = Seq("1,2").toDS()
 val df = spark.read.option("enforceSchema", false).csv(input)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified

2020-03-28 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 38c262b  [SPARK-31261][SQL] Avoid npe when reading bad csv input with 
`columnNameCorruptRecord` specified
38c262b is described below

commit 38c262b97e4fa10b249d51d3f69a0c97760492a3
Author: Zhenhua Wang 
AuthorDate: Sun Mar 29 13:30:14 2020 +0900

[SPARK-31261][SQL] Avoid npe when reading bad csv input with 
`columnNameCorruptRecord` specified

### What changes were proposed in this pull request?

SPARK-25387 avoids npe for bad csv input, but when reading bad csv input 
with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it 
still throws npe.

### Why are the changes needed?

Bug fix.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Add a test.

Closes #28029 from wzhfy/corrupt_column_npe.

Authored-by: Zhenhua Wang 
Signed-off-by: HyukjinKwon 
(cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732)
Signed-off-by: HyukjinKwon 
---
 .../apache/spark/sql/catalyst/csv/UnivocityParser.scala|  3 ++-
 .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 2c5a9d7..8e87a827 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -101,7 +101,8 @@ class UnivocityParser(
 
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
-
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
+val currentContent = tokenizer.getContext.currentParsedContent()
+if (currentContent == null) null else 
UTF8String.fromString(currentContent.stripLineEnd)
   }
 
   // This parser first picks some tokens from the input tokens, according to 
the required schema,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index f9a510d..366cf11 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
 assert(spark.read.csv(input).collect().toSet == Set(Row()))
   }
 
+  test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not 
cause NPE") {
+val schema = StructType(
+  StructField("a", IntegerType) :: StructField("_corrupt_record", 
StringType) :: Nil)
+val input = spark.createDataset(Seq("\u\u\u0001234"))
+
+checkAnswer(
+  spark.read
+.option("columnNameOfCorruptRecord", "_corrupt_record")
+.schema(schema)
+.csv(input),
+  Row(null, null))
+assert(spark.read.csv(input).collect().toSet == Set(Row()))
+  }
+
   test("field names of inferred schema shouldn't compare to the first row") {
 val input = Seq("1,2").toDS()
 val df = spark.read.option("enforceSchema", false).csv(input)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified

2020-03-28 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 38c262b  [SPARK-31261][SQL] Avoid npe when reading bad csv input with 
`columnNameCorruptRecord` specified
38c262b is described below

commit 38c262b97e4fa10b249d51d3f69a0c97760492a3
Author: Zhenhua Wang 
AuthorDate: Sun Mar 29 13:30:14 2020 +0900

[SPARK-31261][SQL] Avoid npe when reading bad csv input with 
`columnNameCorruptRecord` specified

### What changes were proposed in this pull request?

SPARK-25387 avoids npe for bad csv input, but when reading bad csv input 
with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it 
still throws npe.

### Why are the changes needed?

Bug fix.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Add a test.

Closes #28029 from wzhfy/corrupt_column_npe.

Authored-by: Zhenhua Wang 
Signed-off-by: HyukjinKwon 
(cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732)
Signed-off-by: HyukjinKwon 
---
 .../apache/spark/sql/catalyst/csv/UnivocityParser.scala|  3 ++-
 .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 2c5a9d7..8e87a827 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -101,7 +101,8 @@ class UnivocityParser(
 
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
-
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
+val currentContent = tokenizer.getContext.currentParsedContent()
+if (currentContent == null) null else 
UTF8String.fromString(currentContent.stripLineEnd)
   }
 
   // This parser first picks some tokens from the input tokens, according to 
the required schema,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index f9a510d..366cf11 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
 assert(spark.read.csv(input).collect().toSet == Set(Row()))
   }
 
+  test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not 
cause NPE") {
+val schema = StructType(
+  StructField("a", IntegerType) :: StructField("_corrupt_record", 
StringType) :: Nil)
+val input = spark.createDataset(Seq("\u\u\u0001234"))
+
+checkAnswer(
+  spark.read
+.option("columnNameOfCorruptRecord", "_corrupt_record")
+.schema(schema)
+.csv(input),
+  Row(null, null))
+assert(spark.read.csv(input).collect().toSet == Set(Row()))
+  }
+
   test("field names of inferred schema shouldn't compare to the first row") {
 val input = Seq("1,2").toDS()
 val df = spark.read.option("enforceSchema", false).csv(input)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified

2020-03-28 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 38c262b  [SPARK-31261][SQL] Avoid npe when reading bad csv input with 
`columnNameCorruptRecord` specified
38c262b is described below

commit 38c262b97e4fa10b249d51d3f69a0c97760492a3
Author: Zhenhua Wang 
AuthorDate: Sun Mar 29 13:30:14 2020 +0900

[SPARK-31261][SQL] Avoid npe when reading bad csv input with 
`columnNameCorruptRecord` specified

### What changes were proposed in this pull request?

SPARK-25387 avoids npe for bad csv input, but when reading bad csv input 
with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it 
still throws npe.

### Why are the changes needed?

Bug fix.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Add a test.

Closes #28029 from wzhfy/corrupt_column_npe.

Authored-by: Zhenhua Wang 
Signed-off-by: HyukjinKwon 
(cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732)
Signed-off-by: HyukjinKwon 
---
 .../apache/spark/sql/catalyst/csv/UnivocityParser.scala|  3 ++-
 .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 2c5a9d7..8e87a827 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -101,7 +101,8 @@ class UnivocityParser(
 
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
-
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
+val currentContent = tokenizer.getContext.currentParsedContent()
+if (currentContent == null) null else 
UTF8String.fromString(currentContent.stripLineEnd)
   }
 
   // This parser first picks some tokens from the input tokens, according to 
the required schema,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index f9a510d..366cf11 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
 assert(spark.read.csv(input).collect().toSet == Set(Row()))
   }
 
+  test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not 
cause NPE") {
+val schema = StructType(
+  StructField("a", IntegerType) :: StructField("_corrupt_record", 
StringType) :: Nil)
+val input = spark.createDataset(Seq("\u\u\u0001234"))
+
+checkAnswer(
+  spark.read
+.option("columnNameOfCorruptRecord", "_corrupt_record")
+.schema(schema)
+.csv(input),
+  Row(null, null))
+assert(spark.read.csv(input).collect().toSet == Set(Row()))
+  }
+
   test("field names of inferred schema shouldn't compare to the first row") {
 val input = Seq("1,2").toDS()
 val df = spark.read.option("enforceSchema", false).csv(input)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org