[spark] branch branch-2.4 updated: [SPARK-28818][SQL][2.4] Respect source column nullability in the arrays created by `freqItems()`

2020-08-02 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 91f2a25  [SPARK-28818][SQL][2.4] Respect source column nullability in 
the arrays created by `freqItems()`
91f2a25 is described below

commit 91f2a2548ad0f825fc4b5c67264e11abb76bbd9d
Author: Matt Hawes 
AuthorDate: Mon Aug 3 08:55:28 2020 +0900

[SPARK-28818][SQL][2.4] Respect source column nullability in the arrays 
created by `freqItems()`

### What changes were proposed in this pull request?
This PR replaces the hard-coded non-nullability of the array elements 
returned by `freqItems()` with a nullability that reflects the original schema. 
Essentially [the functional 
change](https://github.com/apache/spark/pull/25575/files#diff-bf59bb9f3dc351f5bf6624e5edd2dcf4R122)
 to the schema generation is:
```
StructField(name + "_freqItems", ArrayType(dataType, false))
```
Becomes:
```
StructField(name + "_freqItems", ArrayType(dataType, 
originalField.nullable))
```

Respecting the original nullability prevents issues when Spark depends on 
`ArrayType`'s `containsNull` being accurate. The example that uncovered this is 
calling `collect()` on the dataframe (see 
[ticket](https://issues.apache.org/jira/browse/SPARK-28818) for full repro). 
Though it's likely that there a several places where this could cause a problem.

I've also refactored a small amount of the surrounding code to remove some 
unnecessary steps and group together related operations.

Note: This is the backport PR of #25575 and the credit should be MGHawes.

### Why are the changes needed?
I think it's pretty clear why this change is needed. It fixes a bug that 
currently prevents users from calling `df.freqItems.collect()` along with 
potentially causing other, as yet unknown, issues.

### Does this PR introduce any user-facing change?
Nullability of columns when calling freqItems on them is now respected 
after the change.

### How was this patch tested?
I added a test that specifically tests the carry-through of the nullability 
as well as explicitly calling `collect()` to catch the exact regression that 
was observed. I also ran the test against the old version of the code and it 
fails as expected.

Closes #29327 from maropu/SPARK-28818-2.4.

Lead-authored-by: Matt Hawes 
Co-authored-by: Takeshi Yamamuro 
Signed-off-by: Takeshi Yamamuro 
---
 .../spark/sql/execution/stat/FrequentItems.scala   | 19 
 .../org/apache/spark/sql/DataFrameStatSuite.scala  | 26 +-
 2 files changed, 35 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
index 86f6307..f21efd4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
@@ -89,11 +89,6 @@ object FrequentItems extends Logging {
 // number of max items to keep counts for
 val sizeOfMap = (1 / support).toInt
 val countMaps = Seq.tabulate(numCols)(i => new FreqItemCounter(sizeOfMap))
-val originalSchema = df.schema
-val colInfo: Array[(String, DataType)] = cols.map { name =>
-  val index = originalSchema.fieldIndex(name)
-  (name, originalSchema.fields(index).dataType)
-}.toArray
 
 val freqItems = df.select(cols.map(Column(_)) : 
_*).rdd.treeAggregate(countMaps)(
   seqOp = (counts, row) => {
@@ -117,10 +112,16 @@ object FrequentItems extends Logging {
 )
 val justItems = freqItems.map(m => m.baseMap.keys.toArray)
 val resultRow = Row(justItems : _*)
-// append frequent Items to the column name for easy debugging
-val outputCols = colInfo.map { v =>
-  StructField(v._1 + "_freqItems", ArrayType(v._2, false))
-}
+
+val originalSchema = df.schema
+val outputCols = cols.map { name =>
+  val index = originalSchema.fieldIndex(name)
+  val originalField = originalSchema.fields(index)
+
+  // append frequent Items to the column name for easy debugging
+  StructField(name + "_freqItems", ArrayType(originalField.dataType, 
originalField.nullable))
+}.toArray
+
 val schema = StructType(outputCols).toAttributes
 Dataset.ofRows(df.sparkSession, LocalRelation.fromExternalRows(schema, 
Seq(resultRow)))
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 8eae353..23a1fc4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameS

[spark] branch branch-2.4 updated: [SPARK-28818][SQL][2.4] Respect source column nullability in the arrays created by `freqItems()`

2020-08-02 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 91f2a25  [SPARK-28818][SQL][2.4] Respect source column nullability in 
the arrays created by `freqItems()`
91f2a25 is described below

commit 91f2a2548ad0f825fc4b5c67264e11abb76bbd9d
Author: Matt Hawes 
AuthorDate: Mon Aug 3 08:55:28 2020 +0900

[SPARK-28818][SQL][2.4] Respect source column nullability in the arrays 
created by `freqItems()`

### What changes were proposed in this pull request?
This PR replaces the hard-coded non-nullability of the array elements 
returned by `freqItems()` with a nullability that reflects the original schema. 
Essentially [the functional 
change](https://github.com/apache/spark/pull/25575/files#diff-bf59bb9f3dc351f5bf6624e5edd2dcf4R122)
 to the schema generation is:
```
StructField(name + "_freqItems", ArrayType(dataType, false))
```
Becomes:
```
StructField(name + "_freqItems", ArrayType(dataType, 
originalField.nullable))
```

Respecting the original nullability prevents issues when Spark depends on 
`ArrayType`'s `containsNull` being accurate. The example that uncovered this is 
calling `collect()` on the dataframe (see 
[ticket](https://issues.apache.org/jira/browse/SPARK-28818) for full repro). 
Though it's likely that there a several places where this could cause a problem.

I've also refactored a small amount of the surrounding code to remove some 
unnecessary steps and group together related operations.

Note: This is the backport PR of #25575 and the credit should be MGHawes.

### Why are the changes needed?
I think it's pretty clear why this change is needed. It fixes a bug that 
currently prevents users from calling `df.freqItems.collect()` along with 
potentially causing other, as yet unknown, issues.

### Does this PR introduce any user-facing change?
Nullability of columns when calling freqItems on them is now respected 
after the change.

### How was this patch tested?
I added a test that specifically tests the carry-through of the nullability 
as well as explicitly calling `collect()` to catch the exact regression that 
was observed. I also ran the test against the old version of the code and it 
fails as expected.

Closes #29327 from maropu/SPARK-28818-2.4.

Lead-authored-by: Matt Hawes 
Co-authored-by: Takeshi Yamamuro 
Signed-off-by: Takeshi Yamamuro 
---
 .../spark/sql/execution/stat/FrequentItems.scala   | 19 
 .../org/apache/spark/sql/DataFrameStatSuite.scala  | 26 +-
 2 files changed, 35 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
index 86f6307..f21efd4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
@@ -89,11 +89,6 @@ object FrequentItems extends Logging {
 // number of max items to keep counts for
 val sizeOfMap = (1 / support).toInt
 val countMaps = Seq.tabulate(numCols)(i => new FreqItemCounter(sizeOfMap))
-val originalSchema = df.schema
-val colInfo: Array[(String, DataType)] = cols.map { name =>
-  val index = originalSchema.fieldIndex(name)
-  (name, originalSchema.fields(index).dataType)
-}.toArray
 
 val freqItems = df.select(cols.map(Column(_)) : 
_*).rdd.treeAggregate(countMaps)(
   seqOp = (counts, row) => {
@@ -117,10 +112,16 @@ object FrequentItems extends Logging {
 )
 val justItems = freqItems.map(m => m.baseMap.keys.toArray)
 val resultRow = Row(justItems : _*)
-// append frequent Items to the column name for easy debugging
-val outputCols = colInfo.map { v =>
-  StructField(v._1 + "_freqItems", ArrayType(v._2, false))
-}
+
+val originalSchema = df.schema
+val outputCols = cols.map { name =>
+  val index = originalSchema.fieldIndex(name)
+  val originalField = originalSchema.fields(index)
+
+  // append frequent Items to the column name for easy debugging
+  StructField(name + "_freqItems", ArrayType(originalField.dataType, 
originalField.nullable))
+}.toArray
+
 val schema = StructType(outputCols).toAttributes
 Dataset.ofRows(df.sparkSession, LocalRelation.fromExternalRows(schema, 
Seq(resultRow)))
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 8eae353..23a1fc4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameS