[spark] branch master updated: [SPARK-40965][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1208 ` to `FIELD_NOT_FOUND`

2022-10-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f0d7b31ad61 [SPARK-40965][SQL] Rename the error class 
`_LEGACY_ERROR_TEMP_1208 ` to `FIELD_NOT_FOUND`
f0d7b31ad61 is described below

commit f0d7b31ad617d9c112f81eef2130040adbcf454d
Author: Max Gekk 
AuthorDate: Sun Oct 30 12:29:00 2022 +0300

[SPARK-40965][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1208 ` to 
`FIELD_NOT_FOUND`

### What changes were proposed in this pull request?
In the PR, I propose to assign the proper name `FIELD_NOT_FOUND ` to the 
legacy error class `_LEGACY_ERROR_TEMP_1208 `, and modify test suite to use 
`checkError()` which checks the error class name, context and etc.

### Why are the changes needed?
Proper name improves user experience w/ Spark SQL.

### Does this PR introduce _any_ user-facing change?
Yes, the PR changes an user-facing error message.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "test:testOnly *AnalysisErrorSuite"
$ build/sbt "test:testOnly *ResolveSubquerySuite"
$ build/sbt "test:testOnly *EncoderResolutionSuite"
$ build/sbt "test:testOnly *SQLQuerySuite"
```

Closes #38435 from MaxGekk/field-not-found-error-class.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   |  10 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |   6 +-
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala |   5 +-
 .../catalyst/analysis/ResolveSubquerySuite.scala   |   6 +-
 .../catalyst/encoders/EncoderResolutionSuite.scala |  10 +-
 .../expressions/AttributeResolutionSuite.scala |  10 +-
 .../results/typeCoercion/native/mapZipWith.sql.out |   1 +
 .../apache/spark/sql/ColumnExpressionSuite.scala   | 107 ++---
 .../spark/sql/DataFrameSetOperationsSuite.scala|  12 ++-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  20 ++--
 .../spark/sql/execution/SQLViewTestSuite.scala |  12 ++-
 .../FileMetadataStructRowIndexSuite.scala  |  12 ++-
 .../datasources/FileMetadataStructSuite.scala  |  32 +++---
 13 files changed, 154 insertions(+), 89 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 4797ee0d0d0..e45b6e3bdb6 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -356,6 +356,11 @@
 ],
 "sqlState" : "22023"
   },
+  "FIELD_NOT_FOUND" : {
+"message" : [
+  "No such struct field  in ."
+]
+  },
   "FORBIDDEN_OPERATION" : {
 "message" : [
   "The operation  is not allowed on the : 
"
@@ -2479,11 +2484,6 @@
   "The duration and time inputs to window must be an integer, long or 
string literal."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1208" : {
-"message" : [
-  "No such struct field  in ."
-]
-  },
   "_LEGACY_ERROR_TEMP_1209" : {
 "message" : [
   "Ambiguous reference to fields ."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index cf7e3524d5b..f97888f046b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2080,10 +2080,10 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   def noSuchStructFieldInGivenFieldsError(
   fieldName: String, fields: Array[StructField]): Throwable = {
 new AnalysisException(
-  errorClass = "_LEGACY_ERROR_TEMP_1208",
+  errorClass = "FIELD_NOT_FOUND",
   messageParameters = Map(
-"fieldName" -> fieldName,
-"fields" -> fields.map(_.name).mkString(", ")))
+"fieldName" -> toSQLId(fieldName),
+"fields" -> fields.map(f => toSQLId(f.name)).mkString(", ")))
   }
 
   def ambiguousReferenceToFieldsError(fields: String): Throwable = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index d530be5f5e4..f3bca030380 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -370,10 +370,11 @@ class AnalysisErrorSuite extends AnalysisTest {
 "Ambiguous reference to fields" :: "differentCase" :: "differentcase" :: 
Nil,
 caseSensitive = false)
 
-  errorTest(
+  errorClassTest(
 "missing field",
 nestedRelation2.select($"top.c"),
-

[spark] branch master updated: [SPARK-40953][CONNECT][PYTHON] Fix `DataFrame.head` to collect specified number of rows

2022-10-30 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 41ccae0922f [SPARK-40953][CONNECT][PYTHON] Fix `DataFrame.head` to 
collect specified number of rows
41ccae0922f is described below

commit 41ccae0922f39aff5dbef236bab78959269ddd5c
Author: Ruifeng Zheng 
AuthorDate: Sun Oct 30 20:34:57 2022 +0900

[SPARK-40953][CONNECT][PYTHON] Fix `DataFrame.head` to collect specified 
number of rows

### What changes were proposed in this pull request?
`head` should return `self.limit(n).toPandas()` instead of `self.toPandas()`

### Why are the changes needed?
bugfix

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing tests

Closes #38424 from zhengruifeng/connect_head_fix.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/dataframe.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index bf9ed83615b..03a766aff30 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -215,8 +215,7 @@ class DataFrame(object):
 return GroupingFrame(self, *cols)
 
 def head(self, n: int) -> Optional["pandas.DataFrame"]:
-self.limit(n)
-return self.toPandas()
+return self.limit(n).toPandas()
 
 # TODO(martin.grund) fix mypu
 def join(self, other: "DataFrame", on: Any, how: Optional[str] = None) -> 
"DataFrame":


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40925][SQL][SS] Fix stateful operator late record filtering

2022-10-30 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 406d0e243cf [SPARK-40925][SQL][SS] Fix stateful operator late record 
filtering
406d0e243cf is described below

commit 406d0e243cfec9b29df946e1a0e20ed5fe25e152
Author: Alex Balikov <91913242+alex-bali...@users.noreply.github.com>
AuthorDate: Mon Oct 31 10:13:50 2022 +0900

[SPARK-40925][SQL][SS] Fix stateful operator late record filtering

### What changes were proposed in this pull request?

This PR fixes the input late record filtering done by stateful operators to 
allow for chaining of stateful operators. Currently stateful operators are 
initialized with the current microbatch watermark and perform both input late 
record filtering and state eviction (e.g. producing aggregations) using the 
same watermark value. The state evicted (or aggregates produced) due to 
watermark advancing is behind the watermark and thus effectively late - if a 
following stateful operator consume [...]

This PR provides two watermark values to the stateful operators - one from 
the previous microbatch to be used for late record filtering and the one from 
the current microbatch (as in the existing code) to be used for state eviction. 
This solves the above problem of the broken late record filtering.

Note that this PR still does not solve the issue of time-interval stream 
join producing records delayed against the watermark. Therefore time-interval 
streaming join followed by stateful operators is still not supported. That will 
be fixed in a follow up PR (and a SPIP) effectively replacing the single global 
watermark with conceptually watermarks per operator.

Also, the stateful operator chains unblocked by this PR (e.g. a chain of 
window aggregations) are still blocked by the unsupported operations checker. 
The new test for these scenarios - MultiStatefulOperatorsSuite has to 
explicitly disable the unsupported ops check. This again will be fixed in a 
follow-up PR.

### Why are the changes needed?

The PR allows Spark Structured Streaming to support chaining of stateful 
operators e.g. chaining of time window aggregations which is a meaningful 
streaming scenario.

### Does this PR introduce _any_ user-facing change?

With this PR, chains of stateful operators will be supported in Spark 
Structured Streaming.

### How was this patch tested?

Added a new test suite - MultiStatefulOperatorsSuite

Closes #38405 from alex-balikov/multiple_stateful-ops-base.

Lead-authored-by: Alex Balikov 
<91913242+alex-bali...@users.noreply.github.com>
Co-authored-by: Alex Balikov 
Signed-off-by: Jungtaek Lim 
---
 .../sql/catalyst/expressions/SessionWindow.scala   |  24 +-
 .../sql/catalyst/expressions/TimeWindow.scala  |  20 +-
 .../org/apache/spark/sql/internal/SQLConf.scala|  16 +
 .../spark/sql/execution/QueryExecution.scala   |   2 +-
 .../spark/sql/execution/SparkStrategies.scala  |   6 +-
 .../spark/sql/execution/aggregate/AggUtils.scala   |   9 +-
 .../FlatMapGroupsInPandasWithStateExec.scala   |  13 +-
 .../streaming/FlatMapGroupsWithStateExec.scala |  27 +-
 .../execution/streaming/IncrementalExecution.scala |  44 ++-
 .../execution/streaming/MicroBatchExecution.scala  |   1 +
 .../sql/execution/streaming/OffsetSeqLog.scala |   4 +
 .../streaming/StreamingSymmetricHashJoinExec.scala |  15 +-
 .../StreamingSymmetricHashJoinHelper.scala |   6 +-
 .../streaming/continuous/ContinuousExecution.scala |   1 +
 .../streaming/sources/MicroBatchWrite.scala|   2 +-
 .../execution/streaming/statefulOperators.scala| 108 +++--
 .../streaming/FlatMapGroupsWithStateSuite.scala|   2 +-
 .../streaming/MultiStatefulOperatorsSuite.scala| 440 +
 .../spark/sql/streaming/StreamingJoinSuite.scala   |   2 +-
 19 files changed, 661 insertions(+), 81 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
index 77e8dfde87b..02273b0c461 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SessionWindow.scala
@@ -68,11 +68,29 @@ case class SessionWindow(timeColumn: Expression, 
gapDuration: Expression) extend
   with Unevaluable
   with NonSQLExpression {
 
+  private def inputTypeOnTimeColumn: AbstractDataType = {
+TypeCollection(
+  AnyTimestampType,
+  // Below two types cover both time window & session window, since they 
produce the same type
+  // of output as window column.
+  new StructType()
+.add(Struct

[spark] branch master updated: [SPARK-40963][SQL] Set nullable correctly in project created by `ExtractGenerator`

2022-10-30 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 90d31541fb0 [SPARK-40963][SQL] Set nullable correctly in project 
created by `ExtractGenerator`
90d31541fb0 is described below

commit 90d31541fb0313d762cc36067060e6445c04a9b6
Author: Bruce Robbins 
AuthorDate: Mon Oct 31 10:45:17 2022 +0900

[SPARK-40963][SQL] Set nullable correctly in project created by 
`ExtractGenerator`

### What changes were proposed in this pull request?

When creating the project list for the new projection In 
`ExtractGenerator`, take into account whether the generator is outer when 
setting nullable on generator-related output attributes.

### Why are the changes needed?

This PR fixes an issue that can produce either incorrect results or a 
`NullPointerException`. It's a bit of an obscure issue in that I am 
hard-pressed to reproduce without using a subquery that has a inline table.

Example:
```
select c1, explode(c4) as c5 from (
  select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(1, 2)),
(2, array(2, 3)),
(3, null)
as data(c1, c2)
  )
);

+---+---+
|c1 |c5 |
+---+---+
|1  |1  |
|1  |2  |
|2  |2  |
|2  |3  |
|3  |0  |
+---+---+
```
In the last row, `c5` is 0, but should be `NULL`.

Another example:
```
select c1, exists(c4, x -> x is null) as c5 from (
  select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(1, 2)),
(2, array(2, 3)),
(3, array())
as data(c1, c2)
  )
);

+---+-+
|c1 |c5   |
+---+-+
|1  |false|
|1  |false|
|2  |false|
|2  |false|
|3  |false|
+---+-+
```
In the last row, `false` should be `true`.

In both cases, at the time `CreateArray(c3)` is instantiated, `c3`'s 
nullability is incorrect because the new projection created by 
`ExtractGenerator` uses `generatorOutput` from `explode_outer(c2)` as a 
projection list. `generatorOutput` doesn't take into account that 
`explode_outer(c2)` is an _outer_ explode, so the nullability setting is lost.

`UpdateAttributeNullability` will eventually fix the nullable setting for 
attributes referring to `c3`, but it doesn't fix the `containsNull` setting for 
`c4` in `explode(c4)` (from the first example) or `exists(c4, x -> x is null)` 
(from the second example).

This example fails with a `NullPointerException`:
```
select c1, inline_outer(c4) from (
  select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(named_struct('a', 1, 'b', 2))),
(2, array(named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6))),
(3, array())
as data(c1, c2)
  )
);
22/10/30 17:34:42 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 
14)
java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_1$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

New unit test.

Closes #38440 from bersprockets/SPARK-40963.

Authored-by: Bruce Robbins 
Signed-off-by: Hyukjin Kwon 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../plans/logical/basicLogicalOperators.scala | 17 +
 .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 19 +++
 3 files changed, 29 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fc12b6522b4..c7b84405412 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2910,7 +2910,7 @@ class Analyzer(override val catalogManag

[spark] branch branch-3.3 updated: [SPARK-40963][SQL] Set nullable correctly in project created by `ExtractGenerator`

2022-10-30 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new c7ef5600a75 [SPARK-40963][SQL] Set nullable correctly in project 
created by `ExtractGenerator`
c7ef5600a75 is described below

commit c7ef5600a75789152af40804f539a3d075cf2c0c
Author: Bruce Robbins 
AuthorDate: Mon Oct 31 10:45:17 2022 +0900

[SPARK-40963][SQL] Set nullable correctly in project created by 
`ExtractGenerator`

### What changes were proposed in this pull request?

When creating the project list for the new projection In 
`ExtractGenerator`, take into account whether the generator is outer when 
setting nullable on generator-related output attributes.

### Why are the changes needed?

This PR fixes an issue that can produce either incorrect results or a 
`NullPointerException`. It's a bit of an obscure issue in that I am 
hard-pressed to reproduce without using a subquery that has a inline table.

Example:
```
select c1, explode(c4) as c5 from (
  select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(1, 2)),
(2, array(2, 3)),
(3, null)
as data(c1, c2)
  )
);

+---+---+
|c1 |c5 |
+---+---+
|1  |1  |
|1  |2  |
|2  |2  |
|2  |3  |
|3  |0  |
+---+---+
```
In the last row, `c5` is 0, but should be `NULL`.

Another example:
```
select c1, exists(c4, x -> x is null) as c5 from (
  select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(1, 2)),
(2, array(2, 3)),
(3, array())
as data(c1, c2)
  )
);

+---+-+
|c1 |c5   |
+---+-+
|1  |false|
|1  |false|
|2  |false|
|2  |false|
|3  |false|
+---+-+
```
In the last row, `false` should be `true`.

In both cases, at the time `CreateArray(c3)` is instantiated, `c3`'s 
nullability is incorrect because the new projection created by 
`ExtractGenerator` uses `generatorOutput` from `explode_outer(c2)` as a 
projection list. `generatorOutput` doesn't take into account that 
`explode_outer(c2)` is an _outer_ explode, so the nullability setting is lost.

`UpdateAttributeNullability` will eventually fix the nullable setting for 
attributes referring to `c3`, but it doesn't fix the `containsNull` setting for 
`c4` in `explode(c4)` (from the first example) or `exists(c4, x -> x is null)` 
(from the second example).

This example fails with a `NullPointerException`:
```
select c1, inline_outer(c4) from (
  select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(named_struct('a', 1, 'b', 2))),
(2, array(named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6))),
(3, array())
as data(c1, c2)
  )
);
22/10/30 17:34:42 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 
14)
java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_1$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

New unit test.

Closes #38440 from bersprockets/SPARK-40963.

Authored-by: Bruce Robbins 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 90d31541fb0313d762cc36067060e6445c04a9b6)
Signed-off-by: Hyukjin Kwon 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../plans/logical/basicLogicalOperators.scala | 17 +
 .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 19 +++
 3 files changed, 29 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3a3997ff9c7..ad40f924ef8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/or

[spark] branch branch-3.2 updated: [SPARK-40963][SQL] Set nullable correctly in project created by `ExtractGenerator`

2022-10-30 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 815f32fdf4f [SPARK-40963][SQL] Set nullable correctly in project 
created by `ExtractGenerator`
815f32fdf4f is described below

commit 815f32fdf4f6ea61d1eddb5f2d4d05b26a30f671
Author: Bruce Robbins 
AuthorDate: Mon Oct 31 10:45:17 2022 +0900

[SPARK-40963][SQL] Set nullable correctly in project created by 
`ExtractGenerator`

### What changes were proposed in this pull request?

When creating the project list for the new projection In 
`ExtractGenerator`, take into account whether the generator is outer when 
setting nullable on generator-related output attributes.

### Why are the changes needed?

This PR fixes an issue that can produce either incorrect results or a 
`NullPointerException`. It's a bit of an obscure issue in that I am 
hard-pressed to reproduce without using a subquery that has a inline table.

Example:
```
select c1, explode(c4) as c5 from (
  select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(1, 2)),
(2, array(2, 3)),
(3, null)
as data(c1, c2)
  )
);

+---+---+
|c1 |c5 |
+---+---+
|1  |1  |
|1  |2  |
|2  |2  |
|2  |3  |
|3  |0  |
+---+---+
```
In the last row, `c5` is 0, but should be `NULL`.

Another example:
```
select c1, exists(c4, x -> x is null) as c5 from (
  select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(1, 2)),
(2, array(2, 3)),
(3, array())
as data(c1, c2)
  )
);

+---+-+
|c1 |c5   |
+---+-+
|1  |false|
|1  |false|
|2  |false|
|2  |false|
|3  |false|
+---+-+
```
In the last row, `false` should be `true`.

In both cases, at the time `CreateArray(c3)` is instantiated, `c3`'s 
nullability is incorrect because the new projection created by 
`ExtractGenerator` uses `generatorOutput` from `explode_outer(c2)` as a 
projection list. `generatorOutput` doesn't take into account that 
`explode_outer(c2)` is an _outer_ explode, so the nullability setting is lost.

`UpdateAttributeNullability` will eventually fix the nullable setting for 
attributes referring to `c3`, but it doesn't fix the `containsNull` setting for 
`c4` in `explode(c4)` (from the first example) or `exists(c4, x -> x is null)` 
(from the second example).

This example fails with a `NullPointerException`:
```
select c1, inline_outer(c4) from (
  select c1, array(c3) as c4 from (
select c1, explode_outer(c2) as c3
from values
(1, array(named_struct('a', 1, 'b', 2))),
(2, array(named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6))),
(3, array())
as data(c1, c2)
  )
);
22/10/30 17:34:42 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 
14)
java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_1$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

New unit test.

Closes #38440 from bersprockets/SPARK-40963.

Authored-by: Bruce Robbins 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 90d31541fb0313d762cc36067060e6445c04a9b6)
Signed-off-by: Hyukjin Kwon 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../plans/logical/basicLogicalOperators.scala | 17 +
 .../org/apache/spark/sql/GeneratorFunctionSuite.scala | 19 +++
 3 files changed, 29 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8d6261a7847..c5b2229db31 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/or

[spark] branch branch-3.3 updated: [SPARK-40918][SQL][3.3] Mismatch between FileSourceScanExec and Orc and ParquetFileFormat on producing columnar output

2022-10-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new fb2bdeaa234 [SPARK-40918][SQL][3.3] Mismatch between 
FileSourceScanExec and Orc and ParquetFileFormat on producing columnar output
fb2bdeaa234 is described below

commit fb2bdeaa234b76fdd1e2b6101f714891daa89af9
Author: Juliusz Sompolski 
AuthorDate: Mon Oct 31 14:00:09 2022 +0800

[SPARK-40918][SQL][3.3] Mismatch between FileSourceScanExec and Orc and 
ParquetFileFormat on producing columnar output

### What changes were proposed in this pull request?

We move the decision about supporting columnar output based on WSCG one 
level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass 
it as a new required option for ParquetFileFormat / OrcFileFormat. Now the 
semantics is as follows:
* `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` 
returns whether it **can**, not necessarily **will** return columnar output.
* To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` 
needs to be passed to `buildReaderWithPartitionValues` in these two file 
formats. It should only be set to `true` if `supportsBatch` is also `true`, but 
it can be set to `false` if we don't want columnar output nevertheless - this 
way, `FileSourceScanExec` can set it to false when there are more than 100 
columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to 
concern itself about WSCG limits.
* To avoid not passing it by accident, this option is made required. Making 
it required requires updating a few places that use it, but an error resulting 
from this is very obscure. It's better to fail early and explicitly here.

### Why are the changes needed?

This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the 
same issue.

`java.lang.ClassCastException: 
org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to 
org.apache.spark.sql.catalyst.InternalRow` was being thrown because 
ParquetReader was outputting columnar batches, while FileSourceScanExec 
expected row output.

The mismatch comes from the fact that `ParquetFileFormat.supportBatch` 
depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the 
threshold is 100 fields.

When this is used in `FileSourceScanExec`:
```
  override lazy val supportsColumnar: Boolean = {
  relation.fileFormat.supportBatch(relation.sparkSession, schema)
  }
```
the `schema` comes from output attributes, which includes extra metadata 
attributes.

However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was 
calculated again as
```
  relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = options,
hadoopConf = hadoopConf
...
val resultSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
...
val returningBatch = supportBatch(sparkSession, resultSchema)
```

Where `requiredSchema` and `partitionSchema` wouldn't include the metadata 
columns:
```
FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, 
file_path#6388)
FileSourceScanExec: dataSchema: 
StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
FileSourceScanExec: partitionSchema: StructType()
FileSourceScanExec: requiredSchema: 
StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
```

Column like `file_path#6388` are added by the scan, and contain metadata 
added by the scan, not by the file reader which concerns itself with what is 
within the file.

### Does this PR introduce _any_ user-facing change?

Not a public API change, but it is now required to pass 
`FileFormat.OPTION_RETURNING_BATCH` in `options` to 
`ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API 
in Apache Spark is `FileSourceScanExec`.

### How was this patch tested?

Tests added

Backports #38397 from juliuszsompolski/SPARK-40918.

Authored-by: Juliusz Sompolski 
Signed-off-by: Wenchen Fan 

Closes #38431 from juliuszsompolski/SPARK-40918-3.3.

Authored-by: Juliusz Sompolski 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 11 +--
 .../sql/execution/datasources/FileFormat.scala | 13 
 .../execution/datasources/orc/OrcFileFormat.scala  | 33 +---
 .../datasou

[spark] branch master updated (90d31541fb0 -> 3c967f06e6c)

2022-10-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 90d31541fb0 [SPARK-40963][SQL] Set nullable correctly in project 
created by `ExtractGenerator`
 add 3c967f06e6c [SPARK-40967][SQL] Migrate `failAnalysis()` onto error 
classes

No new revisions were added by this update.

Summary of changes:
 core/src/main/resources/error/error-classes.json   |  165 ++
 .../org/apache/spark/sql/AnalysisException.scala   |   14 +
 .../spark/sql/catalyst/analysis/Analyzer.scala |   24 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala  |   83 +-
 .../catalyst/analysis/ResolveInlineTables.scala|   20 +-
 .../catalyst/analysis/higherOrderFunctions.scala   |   10 +-
 .../spark/sql/catalyst/analysis/package.scala  |   24 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  |9 +-
 .../spark/sql/execution/datasources/rules.scala|9 +-
 .../resources/sql-tests/results/ansi/array.sql.out |   32 +-
 .../results/ansi/higher-order-functions.sql.out|   14 +-
 .../test/resources/sql-tests/results/array.sql.out |   32 +-
 .../test/resources/sql-tests/results/count.sql.out |   16 +-
 .../results/higher-order-functions.sql.out |   14 +-
 .../sql-tests/results/inline-table.sql.out |   74 +-
 .../sql-tests/results/postgreSQL/boolean.sql.out   |   19 +-
 .../sql-tests/results/postgreSQL/numeric.sql.out   |   32 +-
 .../results/postgreSQL/window_part3.sql.out|   51 +-
 .../results/postgreSQL/window_part4.sql.out|   19 +-
 .../results/subquery/in-subquery/in-basic.sql.out  |   24 +-
 .../negative-cases/subq-input-typecheck.sql.out|   72 +-
 .../results/table-valued-functions.sql.out |   14 +-
 .../typeCoercion/native/caseWhenCoercion.sql.out   | 1120 +-
 .../results/typeCoercion/native/ifCoercion.sql.out | 1120 +-
 .../typeCoercion/native/inConversion.sql.out   | 2240 ++--
 .../sql-tests/results/udf/udf-inline-table.sql.out |   74 +-
 .../execution/command/PlanResolutionSuite.scala|   22 +-
 27 files changed, 4915 insertions(+), 432 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40713][CONNECT] Improve SET operation support in the proto and the server

2022-10-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 31bff52431d [SPARK-40713][CONNECT] Improve SET operation support in 
the proto and the server
31bff52431d is described below

commit 31bff52431d25dd36253d3cafa17c233ff6a54f2
Author: Rui Wang 
AuthorDate: Mon Oct 31 14:24:38 2022 +0800

[SPARK-40713][CONNECT] Improve SET operation support in the proto and the 
server

### What changes were proposed in this pull request?

1. Rename `Union` to `SetOperation` and make it cover `except`, 
`intersect`, `union`.
2. Improve server side support for `SetOperation`.

### Why are the changes needed?

Improve SET operation support in the proto and the server.

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

UT

Closes #38166 from amaliujia/SPARK-40713.

Authored-by: Rui Wang 
Signed-off-by: Wenchen Fan 
---
 .../main/protobuf/spark/connect/relations.proto|  24 +++--
 .../org/apache/spark/sql/connect/dsl/package.scala |  47 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  40 ++--
 .../connect/planner/SparkConnectPlannerSuite.scala |  39 ++--
 .../connect/planner/SparkConnectProtoSuite.scala   |  36 +++
 python/pyspark/sql/connect/plan.py |   6 +-
 python/pyspark/sql/connect/proto/relations_pb2.py  | 104 ++---
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  95 ---
 8 files changed, 279 insertions(+), 112 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/src/main/protobuf/spark/connect/relations.proto
index 94010487ee5..8421535dd8e 100644
--- a/connector/connect/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto
@@ -35,7 +35,7 @@ message Relation {
 Project project = 3;
 Filter filter = 4;
 Join join = 5;
-Union union = 6;
+SetOperation set_op = 6;
 Sort sort = 7;
 Limit limit = 8;
 Aggregate aggregate = 9;
@@ -127,15 +127,19 @@ message Join {
   }
 }
 
-// Relation of type [[Union]], at least one input must be set.
-message Union {
-  repeated Relation inputs = 1;
-  UnionType union_type = 2;
-
-  enum UnionType {
-UNION_TYPE_UNSPECIFIED = 0;
-UNION_TYPE_DISTINCT = 1;
-UNION_TYPE_ALL = 2;
+// Relation of type [[SetOperation]]
+message SetOperation {
+  Relation left_input = 1;
+  Relation right_input = 2;
+  SetOpType set_op_type = 3;
+  bool is_all = 4;
+  bool by_name = 5;
+
+  enum SetOpType {
+SET_OP_TYPE_UNSPECIFIED = 0;
+SET_OP_TYPE_INTERSECT = 1;
+SET_OP_TYPE_UNION = 2;
+SET_OP_TYPE_EXCEPT = 3;
   }
 }
 
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index a9a97e740d8..9ffc4c4a1fe 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -21,6 +21,7 @@ import scala.language.implicitConversions
 
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.Join.JoinType
+import org.apache.spark.connect.proto.SetOperation.SetOpType
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.connect.planner.DataTypeProtoConverter
 
@@ -322,6 +323,52 @@ package object dsl {
 // resolution only by name in the analyzer.
 proto.Relation.newBuilder().setAggregate(agg.build()).build()
   }
+
+  def except(otherPlan: proto.Relation, isAll: Boolean): proto.Relation = {
+proto.Relation
+  .newBuilder()
+  .setSetOp(
+createSetOperation(logicalPlan, otherPlan, 
SetOpType.SET_OP_TYPE_EXCEPT, isAll))
+  .build()
+  }
+
+  def intersect(otherPlan: proto.Relation, isAll: Boolean): proto.Relation 
=
+proto.Relation
+  .newBuilder()
+  .setSetOp(
+createSetOperation(logicalPlan, otherPlan, 
SetOpType.SET_OP_TYPE_INTERSECT, isAll))
+  .build()
+
+  def union(
+  otherPlan: proto.Relation,
+  isAll: Boolean = true,
+  byName: Boolean = false): proto.Relation =
+proto.Relation
+  .newBuilder()
+  .setSetOp(
+createSetOperation(
+  logicalPlan,
+  otherPlan,
+  SetOpType.SET_OP_TYPE_UNION,
+  isAll,
+  byName))
+  .build()
+
+  private def createSetOperation(
+  left: proto.Relation,
+  right: proto.Relation,
+  t: SetOpType,
+  isAll: Boolean = true,
+  byName: Boolean =