[spark] branch master updated: [SPARK-41346][CONNECT][TESTS][FOLLOWUP] Fix `test_connect_function` to import `PandasOnSparkTestCase` properly

2022-12-05 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b6ec1ce41d8 [SPARK-41346][CONNECT][TESTS][FOLLOWUP] Fix 
`test_connect_function` to import `PandasOnSparkTestCase` properly
b6ec1ce41d8 is described below

commit b6ec1ce41d8c35a3e9d3ab74c026a7419ded2eae
Author: Dongjoon Hyun 
AuthorDate: Mon Dec 5 22:58:23 2022 -0800

[SPARK-41346][CONNECT][TESTS][FOLLOWUP] Fix `test_connect_function` to 
import `PandasOnSparkTestCase` properly

### What changes were proposed in this pull request?

This PR aims to fix `test_connect_function` to import 
`PandasOnSparkTestCase` properly. If we handle `import` properly, the test 
cases are ignored properly because `should_test_connect` assumes `have_pandas`


https://github.com/apache/spark/blob/97976a5cc915597fd2606602d18c52c075a03bf6/python/pyspark/testing/connectutils.py#L49

### Why are the changes needed?

SPARK-41346 imported `PandasOnSparkTestCase` outside of  `if have_pandas:`.


https://github.com/apache/spark/blob/97976a5cc915597fd2606602d18c52c075a03bf6/python/pyspark/sql/tests/connect/test_connect_function.py#L25-L29

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

**BEFORE**
```
$ python/run-tests --testnames 
pyspark.sql.tests.connect.test_connect_function
...
ModuleNotFoundError: No module named 'pandas'
```

**AFTER**
```
$ python/run-tests --testnames 
pyspark.sql.tests.connect.test_connect_function
...
Skipped tests in pyspark.sql.tests.connect.test_connect_function with 
python3.9:
  test_aggregation_functions 
(pyspark.sql.tests.connect.test_connect_function.SparkConnectFunctionTests) ... 
skip (0.004s)
  test_math_functions 
(pyspark.sql.tests.connect.test_connect_function.SparkConnectFunctionTests) ... 
skip (0.004s)
  test_normal_functions 
(pyspark.sql.tests.connect.test_connect_function.SparkConnectFunctionTests) ... 
skip (0.002s)
  test_sort_with_nulls_order 
(pyspark.sql.tests.connect.test_connect_function.SparkConnectFunctionTests) ... 
skip (0.001s)
  test_sorting_functions_with_column 
(pyspark.sql.tests.connect.test_connect_function.SparkConnectFunctionTests) ... 
skip (0.001s)
```

Closes #38929 from dongjoon-hyun/SPARK-41346.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/sql/tests/connect/test_connect_function.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py 
b/python/pyspark/sql/tests/connect/test_connect_function.py
index 83d27235bdb..6d06421d084 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -24,9 +24,11 @@ from pyspark.sql import SparkSession
 
 if have_pandas:
 from pyspark.sql.connect.session import SparkSession as RemoteSparkSession
+from pyspark.testing.pandasutils import PandasOnSparkTestCase
+else:
+from pyspark.testing.sqlutils import ReusedSQLTestCase as 
PandasOnSparkTestCase
 from pyspark.sql.dataframe import DataFrame
 from pyspark.testing.connectutils import should_test_connect, 
connect_requirement_message
-from pyspark.testing.pandasutils import PandasOnSparkTestCase
 from pyspark.testing.utils import ReusedPySparkTestCase
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41034][CONNECT][TESTS][FOLLOWUP] `connectutils` should be skipped when pandas is not installed

2022-12-05 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 97976a5cc91 [SPARK-41034][CONNECT][TESTS][FOLLOWUP] `connectutils` 
should be skipped when pandas is not installed
97976a5cc91 is described below

commit 97976a5cc915597fd2606602d18c52c075a03bf6
Author: Dongjoon Hyun 
AuthorDate: Mon Dec 5 22:35:25 2022 -0800

[SPARK-41034][CONNECT][TESTS][FOLLOWUP] `connectutils` should be skipped 
when pandas is not installed

### What changes were proposed in this pull request?

This PR aims to fix two errors.
```
$ python/run-tests --testnames 
pyspark.sql.tests.connect.test_connect_column_expressions
...
NameError: name 'SparkSession' is not defined

...
NameError: name 'LogicalPlan' is not defined
```

### Why are the changes needed?

Previously, `connect` tests are ignored when `pandas` is not available.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.
```
$ python/run-tests --testnames 
pyspark.sql.tests.connect.test_connect_column_expressions
...
Finished test(python3): 
pyspark.sql.tests.connect.test_connect_column_expressions (0s) ... 9 tests were 
skipped
Tests passed in 0 seconds

Skipped tests in pyspark.sql.tests.connect.test_connect_column_expressions 
with python3:
  test_binary_literal 
(pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite)
 ... skip (0.002s)
  test_column_alias 
(pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite)
 ... skip (0.000s)
  test_column_literals 
(pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite)
 ... skip (0.000s)
  test_float_nan_inf 
(pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite)
 ... skip (0.000s)
  test_map_literal 
(pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite)
 ... skip (0.000s)
  test_null_literal 
(pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite)
 ... skip (0.000s)
  test_numeric_literal_types 
(pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite)
 ... skip (0.000s)
  test_simple_column_expressions 
(pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite)
 ... skip (0.000s)
  test_uuid_literal 
(pyspark.sql.tests.connect.test_connect_column_expressions.SparkConnectColumnExpressionSuite)
 ... skip (0.000s)
```

Closes #38928 from dongjoon-hyun/SPARK-41034.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/testing/connectutils.py | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/testing/connectutils.py 
b/python/pyspark/testing/connectutils.py
index feca9e9f825..05df6b02e67 100644
--- a/python/pyspark/testing/connectutils.py
+++ b/python/pyspark/testing/connectutils.py
@@ -69,7 +69,8 @@ class MockRemoteSession:
 class PlanOnlyTestFixture(unittest.TestCase):
 
 connect: "MockRemoteSession"
-session: SparkSession
+if have_pandas:
+session: SparkSession
 
 @classmethod
 def _read_table(cls, table_name: str) -> "DataFrame":
@@ -95,9 +96,10 @@ class PlanOnlyTestFixture(unittest.TestCase):
 def _session_sql(cls, query: str) -> "DataFrame":
 return DataFrame.withPlan(SQL(query), cls.connect)  # type: ignore
 
-@classmethod
-def _with_plan(cls, plan: LogicalPlan) -> "DataFrame":
-return DataFrame.withPlan(plan, cls.connect)  # type: ignore
+if have_pandas:
+@classmethod
+def _with_plan(cls, plan: LogicalPlan) -> "DataFrame":
+return DataFrame.withPlan(plan, cls.connect)  # type: ignore
 
 @classmethod
 def setUpClass(cls: Any) -> None:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (78532fd7c6d -> 324d0909623)

2022-12-05 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 78532fd7c6d [SPARK-41244][UI] Introducing a Protobuf serializer for UI 
data on KV store
 add 324d0909623 Revert "[SPARK-41369][CONNECT][BUILD] Split connect 
project into common and server projects"

No new revisions were added by this update.

Summary of changes:
 connector/connect/common/pom.xml   | 225 -
 connector/connect/dev/generate_protos.sh   |   2 +-
 connector/connect/{server => }/pom.xml |  85 ++--
 .../connect/{common => }/src/main/buf.gen.yaml |   0
 .../connect/{common => }/src/main/buf.work.yaml|   0
 .../{common => }/src/main/protobuf/buf.yaml|   0
 .../src/main/protobuf/spark/connect/base.proto |   0
 .../src/main/protobuf/spark/connect/commands.proto |   0
 .../main/protobuf/spark/connect/expressions.proto  |   0
 .../main/protobuf/spark/connect/relations.proto|   0
 .../src/main/protobuf/spark/connect/types.proto|   0
 .../spark/sql/connect/SparkConnectPlugin.scala |   0
 .../apache/spark/sql/connect/config/Connect.scala  |   0
 .../org/apache/spark/sql/connect/dsl/package.scala |   0
 .../connect/planner/DataTypeProtoConverter.scala   |   0
 .../sql/connect/planner/SparkConnectPlanner.scala  |   0
 .../service/SparkConnectInterceptorRegistry.scala  |   0
 .../sql/connect/service/SparkConnectService.scala  |   0
 .../service/SparkConnectStreamHandler.scala|   0
 .../src/test/resources/log4j2.properties   |   0
 .../messages/ConnectProtoMessagesSuite.scala   |   0
 .../connect/planner/SparkConnectPlannerSuite.scala |   0
 .../connect/planner/SparkConnectProtoSuite.scala   |   0
 .../connect/planner/SparkConnectServiceSuite.scala |   0
 .../connect/service/InterceptorRegistrySuite.scala |   0
 pom.xml|   3 +-
 project/SparkBuild.scala   | 114 +++
 27 files changed, 98 insertions(+), 331 deletions(-)
 delete mode 100644 connector/connect/common/pom.xml
 rename connector/connect/{server => }/pom.xml (83%)
 rename connector/connect/{common => }/src/main/buf.gen.yaml (100%)
 rename connector/connect/{common => }/src/main/buf.work.yaml (100%)
 rename connector/connect/{common => }/src/main/protobuf/buf.yaml (100%)
 rename connector/connect/{common => 
}/src/main/protobuf/spark/connect/base.proto (100%)
 rename connector/connect/{common => 
}/src/main/protobuf/spark/connect/commands.proto (100%)
 rename connector/connect/{common => 
}/src/main/protobuf/spark/connect/expressions.proto (100%)
 rename connector/connect/{common => 
}/src/main/protobuf/spark/connect/relations.proto (100%)
 rename connector/connect/{common => 
}/src/main/protobuf/spark/connect/types.proto (100%)
 rename connector/connect/{server => 
}/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala (100%)
 rename connector/connect/{server => 
}/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala (100%)
 rename connector/connect/{server => 
}/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala (100%)
 rename connector/connect/{server => 
}/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala
 (100%)
 rename connector/connect/{server => 
}/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala 
(100%)
 rename connector/connect/{server => 
}/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala
 (100%)
 rename connector/connect/{server => 
}/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala 
(100%)
 rename connector/connect/{server => 
}/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 (100%)
 rename connector/connect/{server => }/src/test/resources/log4j2.properties 
(100%)
 rename connector/connect/{server => 
}/src/test/scala/org/apache/spark/sql/connect/messages/ConnectProtoMessagesSuite.scala
 (100%)
 rename connector/connect/{server => 
}/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
 (100%)
 rename connector/connect/{server => 
}/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
 (100%)
 rename connector/connect/{server => 
}/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
 (100%)
 rename connector/connect/{server => 
}/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala
 (100%)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41244][UI] Introducing a Protobuf serializer for UI data on KV store

2022-12-05 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 78532fd7c6d [SPARK-41244][UI] Introducing a Protobuf serializer for UI 
data on KV store
78532fd7c6d is described below

commit 78532fd7c6d741d501fcc7d375471917d0f79328
Author: Gengliang Wang 
AuthorDate: Mon Dec 5 21:46:31 2022 -0800

[SPARK-41244][UI] Introducing a Protobuf serializer for UI data on KV store

### What changes were proposed in this pull request?

Introducing Protobuf serializer for KV store, which is 3 times as fast as 
the default serializer according to end-to-end benchmark against RocksDB.
| Serializer   | Avg Write time(μs) | Avg Read time(μs) 
| RocksDB File Total Size(MB) | Result total size in memory(MB) |

|--||---|-|-|
| Spark’s KV Serializer(JSON+gzip) | 352.2  | 119.26
| 837 | 868 |
| Protobuf | 109.9  | 34.3  
| 858 | 2105|

To move fast and make PR review easier, this PR will:
* Cover the class `JobDataWrapper` only. We can handle more UI data later.
* Not adding configuration for setting serializer in SHS. We will have it 
as a follow-up.

### Why are the changes needed?

A faster serializer for KV store. It supports schema evolution so that in 
the future SHS can leverage it as well.
More details in the SPIP: 
https://docs.google.com/document/d/1cuKnFwlTodyVhUQPMuakq2YDaLH05jaY9FRu_aD1zMo/edit

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Closes #38779 from gengliangwang/protobuf.

Authored-by: Gengliang Wang 
Signed-off-by: Gengliang Wang 
---
 .../spark/util/kvstore/KVStoreSerializer.java  |   4 +-
 connector/protobuf/pom.xml |   2 +-
 core/pom.xml   |  49 -
 .../apache/spark/status/protobuf/store_types.proto |  57 ++
 .../spark/deploy/history/FsHistoryProvider.scala   |   2 +-
 .../org/apache/spark/status/AppStatusStore.scala   |   6 +-
 .../scala/org/apache/spark/status/KVUtils.scala|  34 --
 .../status/protobuf/JobDataWrapperSerializer.scala | 119 +
 .../protobuf/KVStoreProtobufSerializer.scala   |  34 ++
 .../spark/status/AppStatusListenerSuite.scala  |  14 ++-
 .../protobuf/KVStoreProtobufSerializerSuite.scala  |  81 ++
 pom.xml|   1 +
 project/SparkBuild.scala   |  14 +++
 13 files changed, 398 insertions(+), 19 deletions(-)

diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
index ff99d052cf7..02dd73e1a2f 100644
--- 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
@@ -49,7 +49,7 @@ public class KVStoreSerializer {
 this.mapper = new ObjectMapper();
   }
 
-  public final byte[] serialize(Object o) throws Exception {
+  public byte[] serialize(Object o) throws Exception {
 if (o instanceof String) {
   return ((String) o).getBytes(UTF_8);
 } else {
@@ -62,7 +62,7 @@ public class KVStoreSerializer {
   }
 
   @SuppressWarnings("unchecked")
-  public final  T deserialize(byte[] data, Class klass) throws Exception 
{
+  public  T deserialize(byte[] data, Class klass) throws Exception {
 if (klass.equals(String.class)) {
   return (T) new String(data, UTF_8);
 } else {
diff --git a/connector/protobuf/pom.xml b/connector/protobuf/pom.xml
index 7057e6148d4..3036fcbf256 100644
--- a/connector/protobuf/pom.xml
+++ b/connector/protobuf/pom.xml
@@ -122,7 +122,7 @@
   
 com.github.os72
 protoc-jar-maven-plugin
-3.11.4
+${protoc-jar-maven-plugin.version}
 
 
   
diff --git a/core/pom.xml b/core/pom.xml
index 182cab90427..a9b40acf5a3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -532,7 +532,12 @@
   org.apache.commons
   commons-crypto
 
-
+
+  com.google.protobuf
+  protobuf-java
+  ${protobuf.version}
+  compile
+
   
   
 
target/scala-${scala.binary.version}/classes
@@ -616,6 +621,48 @@
   
 
   
+  
+org.apache.maven.plugins
+maven-shade-plugin
+
+  false
+  true
+  
+ 

[spark] branch branch-3.3 updated: [SPARK-41350][SQL] Allow simple name access of join hidden columns after subquery alias

2022-12-05 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 398948ef51c [SPARK-41350][SQL] Allow simple name access of join hidden 
columns after subquery alias
398948ef51c is described below

commit 398948ef51ca9dd81be70f0ce02b1b5cebfd02e6
Author: Wenchen Fan 
AuthorDate: Mon Dec 5 10:24:25 2022 +0900

[SPARK-41350][SQL] Allow simple name access of join hidden columns after 
subquery alias



### What changes were proposed in this pull request?

This fixes a regression caused by 
https://github.com/apache/spark/pull/37758 . In 
https://github.com/apache/spark/pull/37758 , we decided to only allow qualified 
name access for using/natural join hidden columns, to fix other problems around 
hidden columns.

We thought that is not a breaking change, as you can only access the join 
hidden columns by qualified names to disambiguate. However, one case is missed: 
when we wrap the join with a subquery alias, the ambiguity is gone and we 
should allow simple name access.

This PR fixes this bug by removing the qualified access only restriction in 
`SubqueryAlias.output`.

### Why are the changes needed?

fix a regression.

### Does this PR introduce _any_ user-facing change?

Yes, certain querys that failed with `UNRESOLVED_COLUMN` before this PR can 
work now.

### How was this patch tested?

new tests

Closes #38862 from cloud-fan/join.

Authored-by: Wenchen Fan 
Signed-off-by: Hyukjin Kwon 
---
 .../plans/logical/basicLogicalOperators.scala  | 10 -
 .../apache/spark/sql/catalyst/util/package.scala   |  7 
 .../resources/sql-tests/inputs/natural-join.sql|  2 +
 .../test/resources/sql-tests/inputs/using-join.sql |  8 
 .../sql-tests/results/natural-join.sql.out | 10 +
 .../resources/sql-tests/results/using-join.sql.out | 44 ++
 6 files changed, 80 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index bdc6e48d08a..69fd825a086 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1329,7 +1329,15 @@ case class SubqueryAlias(
 
   override def output: Seq[Attribute] = {
 val qualifierList = identifier.qualifier :+ alias
-child.output.map(_.withQualifier(qualifierList))
+child.output.map { attr =>
+  // `SubqueryAlias` sets a new qualifier for its output columns. It 
doesn't make sense to still
+  // restrict the hidden columns of natural/using join to be accessed by 
qualified name only.
+  if (attr.qualifiedAccessOnly) {
+attr.markAsAllowAnyAccess().withQualifier(qualifierList)
+  } else {
+attr.withQualifier(qualifierList)
+  }
+}
   }
 
   override def metadataOutput: Seq[Attribute] = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 257749ed6d0..c6374a20c46 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -208,5 +208,12 @@ package object util extends Logging {
 .putBoolean(QUALIFIED_ACCESS_ONLY, true)
 .build()
 )
+
+def markAsAllowAnyAccess(): Attribute = attr.withMetadata(
+  new MetadataBuilder()
+.withMetadata(attr.metadata)
+.remove(QUALIFIED_ACCESS_ONLY)
+.build()
+)
   }
 }
diff --git a/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql 
b/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql
index 060f15e3d2e..9c9ce6c37ba 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql
@@ -50,6 +50,8 @@ SELECT *, nt2.k FROM nt1 natural join nt2;
 
 SELECT nt1.k, nt2.k FROM nt1 natural join nt2;
 
+SELECT k FROM (SELECT nt2.k FROM nt1 natural join nt2);
+
 SELECT nt1.k, nt2.k FROM nt1 natural join nt2 where k = "one";
 
 SELECT * FROM (SELECT * FROM nt1 natural join nt2);
diff --git a/sql/core/src/test/resources/sql-tests/inputs/using-join.sql 
b/sql/core/src/test/resources/sql-tests/inputs/using-join.sql
index 87390b38876..414221e5b71 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/using-join.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/using-join.sql
@@ -19,6 +19,8 @@ SELECT nt1.*, nt2.* FROM nt1 left outer joi

[spark] branch master updated: [SPARK-41399][CONNECT] Refactor column related tests to test_connect_column

2022-12-05 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 889c08b4fe4 [SPARK-41399][CONNECT] Refactor column related tests to 
test_connect_column
889c08b4fe4 is described below

commit 889c08b4fe4fb58157a68af5ecd50ec1df10d127
Author: Rui Wang 
AuthorDate: Mon Dec 5 19:11:06 2022 -0800

[SPARK-41399][CONNECT] Refactor column related tests to test_connect_column

### What changes were proposed in this pull request?

Given there is a dedicated `test_connect_column.py` now, we should move 
those column API tests to this place.

### Why are the changes needed?

Codebase refactoring

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

Existing UT

Closes #38925 from amaliujia/move_all_column_tests_to_right_place.

Authored-by: Rui Wang 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/tests/connect/test_connect_basic.py| 51 -
 .../sql/tests/connect/test_connect_column.py   | 52 ++
 2 files changed, 52 insertions(+), 51 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 22ee98558de..1f67f4c49de 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -146,48 +146,6 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 )
 self.assert_eq(joined_plan3.toPandas(), joined_plan4.toPandas())
 
-def test_columns(self):
-# SPARK-41036: test `columns` API for python client.
-df = self.connect.read.table(self.tbl_name)
-df2 = self.spark.read.table(self.tbl_name)
-self.assertEqual(["id", "name"], df.columns)
-
-self.assert_eq(
-df.filter(df.name.rlike("20")).toPandas(), 
df2.filter(df2.name.rlike("20")).toPandas()
-)
-self.assert_eq(
-df.filter(df.name.like("20")).toPandas(), 
df2.filter(df2.name.like("20")).toPandas()
-)
-self.assert_eq(
-df.filter(df.name.ilike("20")).toPandas(), 
df2.filter(df2.name.ilike("20")).toPandas()
-)
-self.assert_eq(
-df.filter(df.name.contains("20")).toPandas(),
-df2.filter(df2.name.contains("20")).toPandas(),
-)
-self.assert_eq(
-df.filter(df.name.startswith("2")).toPandas(),
-df2.filter(df2.name.startswith("2")).toPandas(),
-)
-self.assert_eq(
-df.filter(df.name.endswith("0")).toPandas(),
-df2.filter(df2.name.endswith("0")).toPandas(),
-)
-self.assert_eq(
-df.select(df.name.substr(0, 1).alias("col")).toPandas(),
-df2.select(df2.name.substr(0, 1).alias("col")).toPandas(),
-)
-df3 = self.connect.sql("SELECT cast(null as int) as name")
-df4 = self.spark.sql("SELECT cast(null as int) as name")
-self.assert_eq(
-df3.filter(df3.name.isNull()).toPandas(),
-df4.filter(df4.name.isNull()).toPandas(),
-)
-self.assert_eq(
-df3.filter(df3.name.isNotNull()).toPandas(),
-df4.filter(df4.name.isNotNull()).toPandas(),
-)
-
 def test_collect(self):
 df = self.connect.read.table(self.tbl_name)
 data = df.limit(10).collect()
@@ -369,15 +327,6 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 finally:
 shutil.rmtree(tmpPath)
 
-def test_simple_binary_expressions(self):
-"""Test complex expression"""
-df = self.connect.read.table(self.tbl_name)
-pd = df.select(df.id).where(df.id % lit(30) == 
lit(0)).sort(df.id.asc()).toPandas()
-self.assertEqual(len(pd.index), 4)
-
-res = pandas.DataFrame(data={"id": [0, 30, 60, 90]})
-self.assert_(pd.equals(res), f"{pd.to_string()} != {res.to_string()}")
-
 def test_limit_offset(self):
 df = self.connect.read.table(self.tbl_name)
 pd = df.limit(10).offset(1).toPandas()
diff --git a/python/pyspark/sql/tests/connect/test_connect_column.py 
b/python/pyspark/sql/tests/connect/test_connect_column.py
index 803481508e5..106ab609bfa 100644
--- a/python/pyspark/sql/tests/connect/test_connect_column.py
+++ b/python/pyspark/sql/tests/connect/test_connect_column.py
@@ -20,6 +20,7 @@ from pyspark.testing.sqlutils import have_pandas
 
 if have_pandas:
 from pyspark.sql.connect.functions import lit
+import pandas
 
 
 class SparkConnectTests(SparkConnectSQLTestCase):
@@ -28,6 +29,57 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 df = self.connect.range(10)
 self.assertEqual(9, len(df.filter(df.id != lit(1)).collect()))
 
+def test_colum

[spark] branch master updated (37f1fae3514 -> d34c95161ce)

2022-12-05 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 37f1fae3514 [SPARK-41394][PYTHON][TESTS] Skip `MemoryProfilerTests` 
when pandas is not installed
 add d34c95161ce [SPARK-41369][CONNECT][BUILD] Split connect project into 
common and server projects

No new revisions were added by this update.

Summary of changes:
 connector/connect/common/pom.xml   | 225 +
 .../connect/{ => common}/src/main/buf.gen.yaml |   0
 .../connect/{ => common}/src/main/buf.work.yaml|   0
 .../{ => common}/src/main/protobuf/buf.yaml|   0
 .../src/main/protobuf/spark/connect/base.proto |   0
 .../src/main/protobuf/spark/connect/commands.proto |   0
 .../main/protobuf/spark/connect/expressions.proto  |   0
 .../main/protobuf/spark/connect/relations.proto|   0
 .../src/main/protobuf/spark/connect/types.proto|   0
 connector/connect/dev/generate_protos.sh   |   2 +-
 connector/connect/{ => server}/pom.xml |  85 ++--
 .../spark/sql/connect/SparkConnectPlugin.scala |   0
 .../apache/spark/sql/connect/config/Connect.scala  |   0
 .../org/apache/spark/sql/connect/dsl/package.scala |   0
 .../connect/planner/DataTypeProtoConverter.scala   |   0
 .../sql/connect/planner/SparkConnectPlanner.scala  |   0
 .../service/SparkConnectInterceptorRegistry.scala  |   0
 .../sql/connect/service/SparkConnectService.scala  |   0
 .../service/SparkConnectStreamHandler.scala|   0
 .../src/test/resources/log4j2.properties   |   0
 .../messages/ConnectProtoMessagesSuite.scala   |   0
 .../connect/planner/SparkConnectPlannerSuite.scala |   0
 .../connect/planner/SparkConnectProtoSuite.scala   |   0
 .../connect/planner/SparkConnectServiceSuite.scala |   0
 .../connect/service/InterceptorRegistrySuite.scala |   0
 pom.xml|   3 +-
 project/SparkBuild.scala   | 114 ---
 27 files changed, 331 insertions(+), 98 deletions(-)
 create mode 100644 connector/connect/common/pom.xml
 rename connector/connect/{ => common}/src/main/buf.gen.yaml (100%)
 rename connector/connect/{ => common}/src/main/buf.work.yaml (100%)
 rename connector/connect/{ => common}/src/main/protobuf/buf.yaml (100%)
 rename connector/connect/{ => 
common}/src/main/protobuf/spark/connect/base.proto (100%)
 rename connector/connect/{ => 
common}/src/main/protobuf/spark/connect/commands.proto (100%)
 rename connector/connect/{ => 
common}/src/main/protobuf/spark/connect/expressions.proto (100%)
 rename connector/connect/{ => 
common}/src/main/protobuf/spark/connect/relations.proto (100%)
 rename connector/connect/{ => 
common}/src/main/protobuf/spark/connect/types.proto (100%)
 rename connector/connect/{ => server}/pom.xml (83%)
 rename connector/connect/{ => 
server}/src/main/scala/org/apache/spark/sql/connect/SparkConnectPlugin.scala 
(100%)
 rename connector/connect/{ => 
server}/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala (100%)
 rename connector/connect/{ => 
server}/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala (100%)
 rename connector/connect/{ => 
server}/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala
 (100%)
 rename connector/connect/{ => 
server}/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 (100%)
 rename connector/connect/{ => 
server}/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala
 (100%)
 rename connector/connect/{ => 
server}/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 (100%)
 rename connector/connect/{ => 
server}/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 (100%)
 rename connector/connect/{ => server}/src/test/resources/log4j2.properties 
(100%)
 rename connector/connect/{ => 
server}/src/test/scala/org/apache/spark/sql/connect/messages/ConnectProtoMessagesSuite.scala
 (100%)
 rename connector/connect/{ => 
server}/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
 (100%)
 rename connector/connect/{ => 
server}/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
 (100%)
 rename connector/connect/{ => 
server}/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
 (100%)
 rename connector/connect/{ => 
server}/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala
 (100%)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-41151][FOLLOW-UP][SQL][3.3] Keep built-in file _metadata fields nullable value consistent

2022-12-05 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new e8a4fb8fad1 [SPARK-41151][FOLLOW-UP][SQL][3.3] Keep built-in file 
_metadata fields nullable value consistent
e8a4fb8fad1 is described below

commit e8a4fb8fad1e203720ee54a5250425b4d71a0bd7
Author: yaohua 
AuthorDate: Mon Dec 5 16:23:07 2022 -0800

[SPARK-41151][FOLLOW-UP][SQL][3.3] Keep built-in file _metadata fields 
nullable value consistent

### What changes were proposed in this pull request?
Cherry-pick https://github.com/apache/spark/pull/38777. Resolved conflicts 
in 
https://github.com/apache/spark/commit/ac2d027a768f50e279a1785ebf4dae1a37b7d3f4

### Why are the changes needed?
N/A

### Does this PR introduce _any_ user-facing change?
N/A

### How was this patch tested?
N/A

Closes #38910 from Yaohua628/spark-41151-follow-up-3-3.

Authored-by: yaohua 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/execution/datasources/FileFormat.scala | 13 -
 .../execution/datasources/FileMetadataStructSuite.scala  | 16 
 2 files changed, 20 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 0263de8525f..941d2cffe21 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -197,12 +197,15 @@ object FileFormat {
*/
   val OPTION_RETURNING_BATCH = "returning_batch"
 
-  // supported metadata struct fields for hadoop fs relation
+  /**
+   * Schema of metadata struct that can be produced by every file format,
+   * metadata fields for every file format must be *not* nullable.
+   * */
   val METADATA_STRUCT: StructType = new StructType()
-.add(StructField(FILE_PATH, StringType))
-.add(StructField(FILE_NAME, StringType))
-.add(StructField(FILE_SIZE, LongType))
-.add(StructField(FILE_MODIFICATION_TIME, TimestampType))
+.add(StructField(FILE_PATH, StringType, nullable = false))
+.add(StructField(FILE_NAME, StringType, nullable = false))
+.add(StructField(FILE_SIZE, LongType, nullable = false))
+.add(StructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false))
 
   // create a file metadata struct col
   def createFileMetadataCol: AttributeReference =
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 37016b58431..b53ac3a838f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -267,8 +267,8 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
 val expectedSchema = new StructType()
   .add(StructField("myName", StringType))
   .add(StructField("myAge", IntegerType))
-  .add(StructField("myFileName", StringType))
-  .add(StructField("myFileSize", LongType))
+  .add(StructField("myFileName", StringType, nullable = false))
+  .add(StructField("myFileSize", LongType, nullable = false))
 
 assert(aliasDF.schema.fields.toSet == expectedSchema.fields.toSet)
 
@@ -654,13 +654,21 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
 val queryExecution = df.select("_metadata").queryExecution
 val analyzedSchema = queryExecution.analyzed.schema
 val executedSchema = queryExecution.executedPlan.schema
-assert(analyzedSchema.fields.head.name == "_metadata")
-assert(executedSchema.fields.head.name == "_metadata")
 // For stateful streaming, we store the schema in the state store
 // and check consistency across batches.
 // To avoid state schema compatibility mismatched,
 // we should keep nullability consistent for _metadata struct
+assert(analyzedSchema.fields.head.name == "_metadata")
+assert(executedSchema.fields.head.name == "_metadata")
+
+// Metadata struct is not nullable
 assert(!analyzedSchema.fields.head.nullable)
 assert(analyzedSchema.fields.head.nullable == 
executedSchema.fields.head.nullable)
+
+// All sub-fields all not nullable
+val analyzedStruct = 
analyzedSchema.fields.head.dataType.asInstanceOf[StructType]
+val executedStruct = 
executedSchema.fields.head.dataType.asInstanceOf[StructType]
+assert(analyzedStruct.fields.forall(!_.nullable))
+assert(executedStruct.fields.forall(!_.nullable))
   }
 }


--

[spark] branch master updated: [SPARK-41394][PYTHON][TESTS] Skip `MemoryProfilerTests` when pandas is not installed

2022-12-05 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 37f1fae3514 [SPARK-41394][PYTHON][TESTS] Skip `MemoryProfilerTests` 
when pandas is not installed
37f1fae3514 is described below

commit 37f1fae35149391db80c7b33b6716ab97e0b46a2
Author: Dongjoon Hyun 
AuthorDate: Mon Dec 5 14:29:54 2022 -0800

[SPARK-41394][PYTHON][TESTS] Skip `MemoryProfilerTests` when pandas is not 
installed

### What changes were proposed in this pull request?

This PR aims to skip `pandas`-related tests of `MemoryProfilerTests` when 
`pandas` is not installed.

### Why are the changes needed?

For Apache Spark 3.4, to recover the module (like `pyspark-core`) tests 
pass like before SPARK-40281 (#38584).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Run individual test.
```
python/run-tests --testnames pyspark.tests.test_memory_profiler 
--python-executables python3
```

Closes #38920 from dongjoon-hyun/SPARK-41394.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/tests/test_memory_profiler.py | 14 --
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/tests/test_memory_profiler.py 
b/python/pyspark/tests/test_memory_profiler.py
index 3dc8ce4ce22..cdb75e5b6aa 100644
--- a/python/pyspark/tests/test_memory_profiler.py
+++ b/python/pyspark/tests/test_memory_profiler.py
@@ -24,16 +24,16 @@ from io import StringIO
 from typing import Iterator
 from unittest import mock
 
-import pandas as pd
-
 from pyspark import SparkConf, SparkContext
 from pyspark.profiler import has_memory_profiler
 from pyspark.sql import SparkSession
 from pyspark.sql.functions import pandas_udf, udf
+from pyspark.testing.sqlutils import have_pandas, pandas_requirement_message
 from pyspark.testing.utils import PySparkTestCase
 
 
 @unittest.skipIf(not has_memory_profiler, "Must have memory-profiler 
installed.")
+@unittest.skipIf(not have_pandas, pandas_requirement_message)  # type: ignore
 class MemoryProfilerTests(PySparkTestCase):
 def setUp(self):
 self._old_sys_path = list(sys.path)
@@ -103,6 +103,8 @@ class MemoryProfilerTests(PySparkTestCase):
 self.spark.range(10).select(plus_one("id")).collect()
 
 def exec_pandas_udf_ser_to_ser(self):
+import pandas as pd
+
 @pandas_udf("int")
 def ser_to_ser(ser: pd.Series) -> pd.Series:
 return ser + 1
@@ -110,6 +112,8 @@ class MemoryProfilerTests(PySparkTestCase):
 self.spark.range(10).select(ser_to_ser("id")).collect()
 
 def exec_pandas_udf_ser_to_scalar(self):
+import pandas as pd
+
 @pandas_udf("int")
 def ser_to_scalar(ser: pd.Series) -> float:
 return ser.median()
@@ -118,6 +122,8 @@ class MemoryProfilerTests(PySparkTestCase):
 
 # Unsupported
 def exec_pandas_udf_iter_to_iter(self):
+import pandas as pd
+
 @pandas_udf("int")
 def iter_to_iter(batch_ser: Iterator[pd.Series]) -> 
Iterator[pd.Series]:
 for ser in batch_ser:
@@ -126,6 +132,8 @@ class MemoryProfilerTests(PySparkTestCase):
 self.spark.range(10).select(iter_to_iter("id")).collect()
 
 def exec_grouped_map(self):
+import pandas as pd
+
 def grouped_map(pdf: pd.DataFrame) -> pd.DataFrame:
 return pdf.assign(v=pdf.v - pdf.v.mean())
 
@@ -134,6 +142,8 @@ class MemoryProfilerTests(PySparkTestCase):
 
 # Unsupported
 def exec_map(self):
+import pandas as pd
+
 def map(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
 for pdf in pdfs:
 yield pdf[pdf.id == 1]


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41390][SQL] Update the script used to generate `register` function in `UDFRegistration`

2022-12-05 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 11cebdbdd0e [SPARK-41390][SQL] Update the script used to generate 
`register` function in `UDFRegistration`
11cebdbdd0e is described below

commit 11cebdbdd0e6d83cbde5f1cb5e4802a7dd5ada48
Author: yangjie01 
AuthorDate: Mon Dec 5 23:11:23 2022 +0300

[SPARK-41390][SQL] Update the script used to generate `register` function 
in `UDFRegistration`

### What changes were proposed in this pull request?
SPARK-35065 use `QueryCompilationErrors.invalidFunctionArgumentsError` 
instead of  `throw new AnalysisException(...)` for `register` function in 
`UDFRegistration`,  but the script used to generate `register` function has not 
been updated, so this pr update the script.

### Why are the changes needed?
Update the script used to generate `register` function in `UDFRegistration`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually checked the results of the script.

Closes #38916 from LuciferYang/register-func-script.

Authored-by: yangjie01 
Signed-off-by: Max Gekk 
---
 sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala | 6 ++
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 99820336477..80550dc21d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -145,8 +145,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
 |  def builder(e: Seq[Expression]) = if (e.length == $x) {
 |finalUdf.createScalaUDF(e)
 |  } else {
-|throw new AnalysisException("Invalid number of arguments for 
function " + name +
-|  ". Expected: $x; Found: " + e.length)
+|throw QueryCompilationErrors.invalidFunctionArgumentsError(name, 
"$x", e.length)
 |  }
 |  functionRegistry.createOrReplaceTempFunction(name, builder, 
"scala_udf")
 |  finalUdf
@@ -171,8 +170,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
 |  def builder(e: Seq[Expression]) = if (e.length == $i) {
 |ScalaUDF(func, replaced, e, Nil, udfName = Some(name))
 |  } else {
-|throw new AnalysisException("Invalid number of arguments for 
function " + name +
-|  ". Expected: $i; Found: " + e.length)
+|throw QueryCompilationErrors.invalidFunctionArgumentsError(name, 
"$i", e.length)
 |  }
 |  functionRegistry.createOrReplaceTempFunction(name, builder, 
"java_udf")
 |}""".stripMargin)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d2d1b50bfac -> d0a73f80e25)

2022-12-05 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from d2d1b50bfac [SPARK-41372][CONNECT][PYTHON] Implement DataFrame TempView
 add d0a73f80e25 [SPARK-40419][SQL][TESTS][FOLLOWUP] Remove 
results/udaf.sql.out

No new revisions were added by this update.

Summary of changes:
 .../test/resources/sql-tests/results/udaf.sql.out  | 67 --
 1 file changed, 67 deletions(-)
 delete mode 100644 sql/core/src/test/resources/sql-tests/results/udaf.sql.out


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41372][CONNECT][PYTHON] Implement DataFrame TempView

2022-12-05 Thread xinrong
This is an automated email from the ASF dual-hosted git repository.

xinrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d2d1b50bfac [SPARK-41372][CONNECT][PYTHON] Implement DataFrame TempView
d2d1b50bfac is described below

commit d2d1b50bfacf1c5bdcf56f150ae44d1b7e5cb5a6
Author: Rui Wang 
AuthorDate: Mon Dec 5 09:20:10 2022 -0800

[SPARK-41372][CONNECT][PYTHON] Implement DataFrame TempView

### What changes were proposed in this pull request?

Implement DataFrame TempView (which is createTemView and 
createOrReplaceTempView). This is a session local temp view which is different 
from the global temp view.

### Why are the changes needed?

API coverage.

### Does this PR introduce _any_ user-facing change?

NO
### How was this patch tested?

UT

Closes #38891 from amaliujia/createTempView.

Authored-by: Rui Wang 
Signed-off-by: Xinrong Meng 
---
 python/pyspark/sql/connect/dataframe.py| 38 ++
 .../sql/tests/connect/test_connect_basic.py| 14 +++-
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 8e8a5f4d318..026b7e6099f 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1554,6 +1554,44 @@ class DataFrame(object):
 """
 print(self._explain_string(extended=extended, mode=mode))
 
+def createTempView(self, name: str) -> None:
+"""Creates a local temporary view with this :class:`DataFrame`.
+
+The lifetime of this temporary table is tied to the 
:class:`SparkSession`
+that was used to create this :class:`DataFrame`.
+throws :class:`TempTableAlreadyExistsException`, if the view name 
already exists in the
+catalog.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+name : str
+Name of the view.
+"""
+command = plan.CreateView(
+child=self._plan, name=name, is_global=False, replace=False
+).command(session=self._session.client)
+self._session.client.execute_command(command)
+
+def createOrReplaceTempView(self, name: str) -> None:
+"""Creates or replaces a local temporary view with this 
:class:`DataFrame`.
+
+The lifetime of this temporary table is tied to the 
:class:`SparkSession`
+that was used to create this :class:`DataFrame`.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+name : str
+Name of the view.
+"""
+command = plan.CreateView(
+child=self._plan, name=name, is_global=False, replace=True
+).command(session=self._session.client)
+self._session.client.execute_command(command)
+
 def createGlobalTempView(self, name: str) -> None:
 """Creates a global temporary view with this :class:`DataFrame`.
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index abab47b36bf..22ee98558de 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -530,11 +530,23 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 self.connect.sql("SELECT 2 AS X LIMIT 
1").createOrReplaceGlobalTempView("view_1")
 
self.assertTrue(self.spark.catalog.tableExists("global_temp.view_1"))
 
-# Test when creating a view which is alreayd exists but
+# Test when creating a view which is already exists but
 
self.assertTrue(self.spark.catalog.tableExists("global_temp.view_1"))
 with self.assertRaises(grpc.RpcError):
 self.connect.sql("SELECT 1 AS X LIMIT 
0").createGlobalTempView("view_1")
 
+def test_create_session_local_temp_view(self):
+# SPARK-41372: test session local temp view creation.
+with self.tempView("view_local_temp"):
+self.connect.sql("SELECT 1 AS X").createTempView("view_local_temp")
+self.assertEqual(self.connect.sql("SELECT * FROM 
view_local_temp").count(), 1)
+self.connect.sql("SELECT 1 AS X LIMIT 
0").createOrReplaceTempView("view_local_temp")
+self.assertEqual(self.connect.sql("SELECT * FROM 
view_local_temp").count(), 0)
+
+# Test when creating a view which is already exists but
+with self.assertRaises(grpc.RpcError):
+self.connect.sql("SELECT 1 AS X LIMIT 
0").createTempView("view_local_temp")
+
 def test_to_pandas(self):
 # SPARK-41005: Test to pandas
 query = """


-
To unsubscribe, e-mail: commits-unsubscr.

[spark] branch master updated: [SPARK-41389][CORE][SQL] Reuse `WRONG_NUM_ARGS` instead of `_LEGACY_ERROR_TEMP_1044`

2022-12-05 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1996a94b09f [SPARK-41389][CORE][SQL] Reuse `WRONG_NUM_ARGS` instead of 
`_LEGACY_ERROR_TEMP_1044`
1996a94b09f is described below

commit 1996a94b09fe1f450eb33ddb23b16af090bc4d1b
Author: yangjie01 
AuthorDate: Mon Dec 5 18:04:51 2022 +0300

[SPARK-41389][CORE][SQL] Reuse `WRONG_NUM_ARGS` instead of 
`_LEGACY_ERROR_TEMP_1044`

### What changes were proposed in this pull request?
This pr aims to reuse error class `WRONG_NUM_ARGS` instead of 
`_LEGACY_ERROR_TEMP_1044`.

### Why are the changes needed?
Proper names of error classes to improve user experience with Spark SQL.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass Github Actions.

Closes #38913 from LuciferYang/SPARK-41389.

Authored-by: yangjie01 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json| 5 -
 .../org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala   | 5 +++--
 .../scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala  | 6 --
 .../resources/sql-tests/results/sql-compatibility-functions.sql.out | 6 --
 4 files changed, 7 insertions(+), 15 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 7d5c272a77f..19ab5ada2b5 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -2011,11 +2011,6 @@
   "Invalid arguments for function ."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1044" : {
-"message" : [
-  "Function  accepts only one argument."
-]
-  },
   "_LEGACY_ERROR_TEMP_1045" : {
 "message" : [
   "ALTER TABLE SET LOCATION does not support partition for v2 tables."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 3817f00d09d..be16eaec6ac 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -896,8 +896,9 @@ object FunctionRegistry {
   name: String,
   dataType: DataType): (String, (ExpressionInfo, FunctionBuilder)) = {
 val builder = (args: Seq[Expression]) => {
-  if (args.size != 1) {
-throw QueryCompilationErrors.functionAcceptsOnlyOneArgumentError(name)
+  val argSize = args.size
+  if (argSize != 1) {
+throw QueryCompilationErrors.invalidFunctionArgumentsError(name, "1", 
argSize)
   }
   Cast(args.head, dataType)
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 2e20d7aec8d..ed08e33829e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -663,12 +663,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
 }
   }
 
-  def functionAcceptsOnlyOneArgumentError(name: String): Throwable = {
-new AnalysisException(
-  errorClass = "_LEGACY_ERROR_TEMP_1044",
-  messageParameters = Map("name" -> name))
-  }
-
   def alterV2TableSetLocationWithPartitionNotSupportedError(): Throwable = {
 new AnalysisException(
   errorClass = "_LEGACY_ERROR_TEMP_1045",
diff --git 
a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
index e0d5874d058..319ac059385 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/sql-compatibility-functions.sql.out
@@ -94,9 +94,11 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_1044",
+  "errorClass" : "WRONG_NUM_ARGS",
   "messageParameters" : {
-"name" : "string"
+"actualNum" : "2",
+"expectedNum" : "1",
+"functionName" : "`string`"
   },
   "queryContext" : [ {
 "objectType" : "",


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] srowen commented on pull request #429: Change download text for spark 3.2.3. from Apache Hadoop 3.3 to Apache Hadoop 3.2

2022-12-05 Thread GitBox


srowen commented on PR #429:
URL: https://github.com/apache/spark-website/pull/429#issuecomment-1337440852

   Right, but this change is affecting both. Maybe just say "Hadoop 3"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (e234cd8276a -> 986cf2c7a7a)

2022-12-05 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from e234cd8276a [SPARK-41388][K8S] `getReusablePVCs` should ignore 
recently created PVCs in the previous batch
 add 986cf2c7a7a [SPARK-41167][SQL] Improve multi like performance by 
creating a balanced expression tree predicate

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/catalyst/optimizer/expressions.scala  | 10 +-
 .../sql/catalyst/optimizer/LikeSimplificationSuite.scala   | 14 +++---
 2 files changed, 12 insertions(+), 12 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] bjornjorgensen commented on pull request #429: Change download text for spark 3.2.3. from Apache Hadoop 3.3 to Apache Hadoop 3.2

2022-12-05 Thread GitBox


bjornjorgensen commented on PR #429:
URL: https://github.com/apache/spark-website/pull/429#issuecomment-1337390183

   
![image](https://user-images.githubusercontent.com/47577197/205652270-2afc23c8-35d1-4602-b27c-9361b8b6f595.png)
   
   
![image](https://user-images.githubusercontent.com/47577197/205652513-fceb9969-9f3e-44b5-aae2-32028fb6b0b4.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] bjornjorgensen commented on pull request #429: Change download text for spark 3.2.3. from Apache Hadoop 3.3 to Apache Hadoop 3.2

2022-12-05 Thread GitBox


bjornjorgensen commented on PR #429:
URL: https://github.com/apache/spark-website/pull/429#issuecomment-1337373595

   spark-3.2.3-bin-hadoop3.2.tgz
   
   Spark 3.2.3 hadoop 3.2
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] srowen commented on pull request #429: Change download text for spark 3.2.3. from Apache Hadoop 3.3 to Apache Hadoop 3.2

2022-12-05 Thread GitBox


srowen commented on PR #429:
URL: https://github.com/apache/spark-website/pull/429#issuecomment-1337337921

   Yeah this changes the name for Spark 3.2 and 3.3. To be clear what is the 
correct/desired display here - Hadoop 3.2 with Spark 3.2 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] bjornjorgensen commented on pull request #429: Change Apache Hadoop 3.3 to Apache Hadoop 3.2

2022-12-05 Thread GitBox


bjornjorgensen commented on PR #429:
URL: https://github.com/apache/spark-website/pull/429#issuecomment-1337038144

   @dongjoon-hyun Its only for 3.2.3. 
   Spark 3.2.3 have hadoop3.2 not 3.3 as the dropdown are telling. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-41388][K8S] `getReusablePVCs` should ignore recently created PVCs in the previous batch

2022-12-05 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new aac8d0a7e1d [SPARK-41388][K8S] `getReusablePVCs` should ignore 
recently created PVCs in the previous batch
aac8d0a7e1d is described below

commit aac8d0a7e1d20d9fd9fbabcbd80ea1df2aa40225
Author: Dongjoon Hyun 
AuthorDate: Mon Dec 5 01:09:50 2022 -0800

[SPARK-41388][K8S] `getReusablePVCs` should ignore recently created PVCs in 
the previous batch

This PR aims to prevent `getReusablePVCs` from choosing recently created 
PVCs in the very previous batch by excluding newly created PVCs whose creation 
time is within `spark.kubernetes.allocation.batch.delay`.

In case of slow K8s control plane situation where 
`spark.kubernetes.allocation.batch.delay` is too short relatively or 
`spark.kubernetes.executor.enablePollingWithResourceVersion=true` is used, 
`onNewSnapshots` may not return the full list of executor pods created by the 
previous batch. This sometimes makes Spark driver think the PVCs in the 
previous batch are reusable for the next batch.

No.

Pass the CIs with the newly created test case.

Closes #38912 from dongjoon-hyun/SPARK-41388.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit e234cd8276a603ab8a191dd078b11c605b22a50c)
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 651f5da7d58554ebd4b15c5b0204acf2d08ca439)
Signed-off-by: Dongjoon Hyun 
---
 .../cluster/k8s/ExecutorPodsAllocator.scala|  6 +-
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 23 --
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index dc79f2205d5..72bdaed89c9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -364,7 +364,11 @@ private[spark] class ExecutorPodsAllocator(
 .getItems
 .asScala
 
-  val reusablePVCs = createdPVCs.filterNot(pvc => 
pvcsInUse.contains(pvc.getMetadata.getName))
+  val now = Instant.now().toEpochMilli
+  val reusablePVCs = createdPVCs
+.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
+.filter(pvc => now - 
Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli
+  > podAllocationDelay)
   logInfo(s"Found ${reusablePVCs.size} reusable PVCs from 
${createdPVCs.size} PVCs")
   reusablePVCs
 } else {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 5b33da6d2b5..6dbf45ddd61 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -17,9 +17,11 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import java.time.Instant
+import java.time.temporal.ChronoUnit.MILLIS
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.client.KubernetesClient
@@ -678,8 +680,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
 
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
 when(persistentVolumeClaims.withLabel(any(), 
any())).thenReturn(labeledPersistentVolumeClaims)
 
when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList)
-when(persistentVolumeClaimList.getItems)
-  .thenReturn(Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")).asJava)
+val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
+pvc.getMetadata
+  .setCreationTimestamp(Instant.now().minus(podAllocationDelay + 1, 
MILLIS).toString)
+when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava)
 
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), 
meq(secMgr),
 meq(kubernetesClient), any(classOf[ResourceProfile])))
   .thenAnswer((invocation: InvocationOnMock) => {
@@ -742,6 +746,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
   " namespace

[GitHub] [spark-website] bjornjorgensen commented on pull request #429: Change Apache Hadoop 3.3 to Apache Hadoop 3.2

2022-12-05 Thread GitBox


bjornjorgensen commented on PR #429:
URL: https://github.com/apache/spark-website/pull/429#issuecomment-1337018346

   ![hadoop3 
2](https://user-images.githubusercontent.com/47577197/205601369-59a620c6-6947-43f0-9955-0e2bcf1c362e.png)
   
   @srowen Can you fix this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] bjornjorgensen opened a new pull request, #429: Change Apache Hadoop 3.3 to Apache Hadoop 3.2

2022-12-05 Thread GitBox


bjornjorgensen opened a new pull request, #429:
URL: https://github.com/apache/spark-website/pull/429

   Change Apache Hadoop 3.3 to Apache Hadoop 3.2
   
   I have not 
   
   Make sure that you generate site HTML with `bundle exec jekyll build`, and 
include the changes to the HTML in your pull request. See README.md for more 
information.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-41388][K8S] `getReusablePVCs` should ignore recently created PVCs in the previous batch

2022-12-05 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 651f5da7d58 [SPARK-41388][K8S] `getReusablePVCs` should ignore 
recently created PVCs in the previous batch
651f5da7d58 is described below

commit 651f5da7d58554ebd4b15c5b0204acf2d08ca439
Author: Dongjoon Hyun 
AuthorDate: Mon Dec 5 01:09:50 2022 -0800

[SPARK-41388][K8S] `getReusablePVCs` should ignore recently created PVCs in 
the previous batch

This PR aims to prevent `getReusablePVCs` from choosing recently created 
PVCs in the very previous batch by excluding newly created PVCs whose creation 
time is within `spark.kubernetes.allocation.batch.delay`.

In case of slow K8s control plane situation where 
`spark.kubernetes.allocation.batch.delay` is too short relatively or 
`spark.kubernetes.executor.enablePollingWithResourceVersion=true` is used, 
`onNewSnapshots` may not return the full list of executor pods created by the 
previous batch. This sometimes makes Spark driver think the PVCs in the 
previous batch are reusable for the next batch.

No.

Pass the CIs with the newly created test case.

Closes #38912 from dongjoon-hyun/SPARK-41388.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit e234cd8276a603ab8a191dd078b11c605b22a50c)
Signed-off-by: Dongjoon Hyun 
---
 .../cluster/k8s/ExecutorPodsAllocator.scala|  6 +-
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 23 --
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 3519efd3fcb..86fe61077bf 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -367,7 +367,11 @@ class ExecutorPodsAllocator(
 .getItems
 .asScala
 
-  val reusablePVCs = createdPVCs.filterNot(pvc => 
pvcsInUse.contains(pvc.getMetadata.getName))
+  val now = Instant.now().toEpochMilli
+  val reusablePVCs = createdPVCs
+.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
+.filter(pvc => now - 
Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli
+  > podAllocationDelay)
   logInfo(s"Found ${reusablePVCs.size} reusable PVCs from 
${createdPVCs.size} PVCs")
   reusablePVCs
 } else {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index d263bd00731..856ee7031c2 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -17,9 +17,11 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import java.time.Instant
+import java.time.temporal.ChronoUnit.MILLIS
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.client.KubernetesClient
@@ -697,8 +699,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
 
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
 when(persistentVolumeClaims.withLabel(any(), 
any())).thenReturn(labeledPersistentVolumeClaims)
 
when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList)
-when(persistentVolumeClaimList.getItems)
-  .thenReturn(Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")).asJava)
+val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
+pvc.getMetadata
+  .setCreationTimestamp(Instant.now().minus(podAllocationDelay + 1, 
MILLIS).toString)
+when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava)
 
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), 
meq(secMgr),
 meq(kubernetesClient), any(classOf[ResourceProfile])))
   .thenAnswer((invocation: InvocationOnMock) => {
@@ -761,6 +765,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
   " namespace default"))
   }
 
+  test("SPARK-41388: getReusablePVCs should ignore recently created PVCs in 
the previous batch") {
+ 

[spark] branch master updated: [SPARK-41388][K8S] `getReusablePVCs` should ignore recently created PVCs in the previous batch

2022-12-05 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e234cd8276a [SPARK-41388][K8S] `getReusablePVCs` should ignore 
recently created PVCs in the previous batch
e234cd8276a is described below

commit e234cd8276a603ab8a191dd078b11c605b22a50c
Author: Dongjoon Hyun 
AuthorDate: Mon Dec 5 01:09:50 2022 -0800

[SPARK-41388][K8S] `getReusablePVCs` should ignore recently created PVCs in 
the previous batch

### What changes were proposed in this pull request?

This PR aims to prevent `getReusablePVCs` from choosing recently created 
PVCs in the very previous batch by excluding newly created PVCs whose creation 
time is within `spark.kubernetes.allocation.batch.delay`.

### Why are the changes needed?

In case of slow K8s control plane situation where 
`spark.kubernetes.allocation.batch.delay` is too short relatively or 
`spark.kubernetes.executor.enablePollingWithResourceVersion=true` is used, 
`onNewSnapshots` may not return the full list of executor pods created by the 
previous batch. This sometimes makes Spark driver think the PVCs in the 
previous batch are reusable for the next batch.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with the newly created test case.

Closes #38912 from dongjoon-hyun/SPARK-41388.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../cluster/k8s/ExecutorPodsAllocator.scala|  6 +-
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 22 --
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 524ab0c845c..d8ae910b1ae 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -373,7 +373,11 @@ class ExecutorPodsAllocator(
   .getItems
   .asScala
 
-val reusablePVCs = createdPVCs.filterNot(pvc => 
pvcsInUse.contains(pvc.getMetadata.getName))
+val now = Instant.now().toEpochMilli
+val reusablePVCs = createdPVCs
+  .filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
+  .filter(pvc => now - 
Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli
+> podAllocationDelay)
 logInfo(s"Found ${reusablePVCs.size} reusable PVCs from 
${createdPVCs.size} PVCs")
 reusablePVCs
   } catch {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index caec9ef9201..c526bf0968e 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import java.time.Instant
+import java.time.temporal.ChronoUnit.MILLIS
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
@@ -721,8 +722,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
   .set(s"$prefix.option.sizeLimit", "200Gi")
   .set(s"$prefix.option.storageClass", "gp2")
 
-when(persistentVolumeClaimList.getItems)
-  .thenReturn(Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")).asJava)
+val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
+pvc.getMetadata
+  .setCreationTimestamp(Instant.now().minus(podAllocationDelay + 1, 
MILLIS).toString)
+when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava)
 
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), 
meq(secMgr),
 meq(kubernetesClient), any(classOf[ResourceProfile])))
   .thenAnswer((invocation: InvocationOnMock) => {
@@ -791,6 +794,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
 podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", 
Seq.empty[String])
   }
 
+  test("SPARK-41388: getReusablePVCs should ignore recently created PVCs in 
the previous batch") {
+val getReusablePVCs =
+  
PrivateMethod[mutable.Buffer[PersistentVolumeClaim]](Symbol("getReusablePVCs"))
+
+val pvc1 = persi

[spark] branch master updated: [SPARK-41355][SQL] Workaround hive table name validation issue

2022-12-05 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8319cec9869 [SPARK-41355][SQL] Workaround hive table name validation 
issue
8319cec9869 is described below

commit 8319cec9869ffece624ad4117fea31d452c45e5b
Author: Kun Wan 
AuthorDate: Mon Dec 5 16:19:21 2022 +0800

[SPARK-41355][SQL] Workaround hive table name validation issue

### What changes were proposed in this pull request?

Restore dbName and tableName in `HiveShim.getTable()` method.

When we create a hive table, hive will convert the dbName and tableName in 
lower case: 
https://github.com/apache/hive/blob/release-2.3.9-rc0/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1446-L1482

And then throw an exception in `Hive.alterTable()`:


https://github.com/apache/hive/blob/release-2.3.9-rc0/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L623

For example:
* We want to create a table called `tAb_I`
* Hive metastore will check if the table name is valid by 
`MetaStoreUtils.validateName(tbl.getTableName())`
* Hive will call `HiveStringUtils.normalizeIdentifier(tbl.getTableName())` 
and then save the save the table name to lower case, **but after setting the 
local to "tr", it will be `tab_ı` which is not a valid table name**
* When we run alter table command, we will first get the hive table from 
hive metastore which is not a valid table name.
* Update some properties or other, and then try to save it to hive 
metastore.
* Hive metastore will check if the table name is valid and then throw 
exception `org.apache.hadoop.hive.ql.metadata.HiveException: [tab_ı]: is not a 
valid table name`

### Why are the changes needed?

Bug fix for alter table command.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add UT

Closes #38765 from wankunde/write_stats_directly2.

Authored-by: Kun Wan 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/execution/command/DDLSuite.scala | 19 +++
 .../org/apache/spark/sql/hive/client/HiveShim.scala   |  7 ++-
 .../org/apache/spark/sql/hive/StatisticsSuite.scala   |  2 +-
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 6d4e907835e..1e12340d983 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1963,6 +1963,25 @@ abstract class DDLSuite extends QueryTest with 
DDLSuiteBase {
 }
   }
 
+  test(s"Support alter table command with CASE_SENSITIVE is true") {
+withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"true") {
+  withLocale("tr") {
+val dbName = "DaTaBaSe_I"
+withDatabase(dbName) {
+  sql(s"CREATE DATABASE $dbName")
+  sql(s"USE $dbName")
+
+  val tabName = "tAb_I"
+  withTable(tabName) {
+sql(s"CREATE TABLE $tabName(col_I int) USING PARQUET")
+sql(s"ALTER TABLE $tabName SET TBLPROPERTIES ('foo' = 'a')")
+checkAnswer(sql(s"SELECT col_I FROM $tabName"), Nil)
+  }
+}
+  }
+}
+  }
+
   test("set command rejects SparkConf entries") {
 val ex = intercept[AnalysisException] {
   sql(s"SET ${config.CPUS_PER_TASK.key} = 4")
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 95e5582cb8c..5e5d2757e9d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -620,7 +620,12 @@ private[client] class Shim_v0_12 extends Shim with Logging 
{
   tableName: String,
   throwException: Boolean): Table = {
 recordHiveCall()
-hive.getTable(dbName, tableName, throwException)
+val table = hive.getTable(dbName, tableName, throwException)
+if (table != null) {
+  table.getTTable.setTableName(tableName)
+  table.getTTable.setDbName(dbName)
+}
+table
   }
 
   override def getTablesByPattern(hive: Hive, dbName: String, pattern: 
String): Seq[String] = {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index a1f34945868..4b69a01834d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -537,7 +537,7 @@ class Statist