[spark] branch master updated: [SPARK-44030][SQL][FOLLOW-UP] Move unapply from AnyTimestampType to AnyTimestampTypeExpression

2023-06-29 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 618b52097c0 [SPARK-44030][SQL][FOLLOW-UP] Move unapply from 
AnyTimestampType to AnyTimestampTypeExpression
618b52097c0 is described below

commit 618b52097c07105d734aaf9b2a22b372920b3f31
Author: Rui Wang 
AuthorDate: Fri Jun 30 08:38:39 2023 +0300

[SPARK-44030][SQL][FOLLOW-UP] Move unapply from AnyTimestampType to 
AnyTimestampTypeExpression

### What changes were proposed in this pull request?

Move unapply from AnyTimestampType to AnyTimestampTypeExpression.

### Why are the changes needed?

To align with the effort that we use separate type expression class to host 
`unapply`.

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

Existing Test

Closes #41771 from amaliujia/atomic_datatype_expression.

Authored-by: Rui Wang 
Signed-off-by: Max Gekk 
---
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala  |  4 ++--
 .../spark/sql/catalyst/analysis/AnsiTypeCoercion.scala | 14 --
 .../apache/spark/sql/catalyst/analysis/TypeCoercion.scala  | 12 +++-
 .../org/apache/spark/sql/types/AbstractDataType.scala  |  3 ---
 .../org/apache/spark/sql/types/DataTypeExpression.scala|  5 +
 5 files changed, 22 insertions(+), 16 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8a192a4c132..32cec909401 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -428,8 +428,8 @@ class Analyzer(override val catalogManager: CatalogManager) 
extends RuleExecutor
   UnaryMinus(r, mode == EvalMode.ANSI), ansiEnabled = mode == 
EvalMode.ANSI))
   case (_, CalendarIntervalType | _: DayTimeIntervalType) =>
 Cast(DatetimeSub(l, r, TimeAdd(l, UnaryMinus(r, mode == 
EvalMode.ANSI))), l.dataType)
-  case _ if AnyTimestampType.unapply(l) || AnyTimestampType.unapply(r) 
=>
-SubtractTimestamps(l, r)
+  case _ if AnyTimestampTypeExpression.unapply(l) ||
+AnyTimestampTypeExpression.unapply(r) => SubtractTimestamps(l, r)
   case (_, DateType) => SubtractDates(l, r)
   case (DateType, dt) if dt != StringType => DateSub(l, r)
   case _ => s
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
index d3f20f87493..5854f42a061 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
@@ -284,7 +284,7 @@ object AnsiTypeCoercion extends TypeCoercionBase {
   // Skip nodes who's children have not been resolved yet.
   case g if !g.childrenResolved => g
 
-  case g: GetDateField if AnyTimestampType.unapply(g.child) =>
+  case g: GetDateField if AnyTimestampTypeExpression.unapply(g.child) =>
 g.withNewChildren(Seq(Cast(g.child, DateType)))
 }
   }
@@ -294,14 +294,16 @@ object AnsiTypeCoercion extends TypeCoercionBase {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
-  case d @ DateAdd(AnyTimestampType(), _) => d.copy(startDate = 
Cast(d.startDate, DateType))
-  case d @ DateSub(AnyTimestampType(), _) => d.copy(startDate = 
Cast(d.startDate, DateType))
+  case d @ DateAdd(AnyTimestampTypeExpression(), _) =>
+d.copy(startDate = Cast(d.startDate, DateType))
+  case d @ DateSub(AnyTimestampTypeExpression(), _) =>
+d.copy(startDate = Cast(d.startDate, DateType))
 
-  case s @ SubtractTimestamps(DateTypeExpression(), AnyTimestampType(), _, 
_) =>
+  case s @ SubtractTimestamps(DateTypeExpression(), 
AnyTimestampTypeExpression(), _, _) =>
 s.copy(left = Cast(s.left, s.right.dataType))
-  case s @ SubtractTimestamps(AnyTimestampType(), DateTypeExpression(), _, 
_) =>
+  case s @ SubtractTimestamps(AnyTimestampTypeExpression(), 
DateTypeExpression(), _, _) =>
 s.copy(right = Cast(s.right, s.left.dataType))
-  case s @ SubtractTimestamps(AnyTimestampType(), AnyTimestampType(), _, _)
+  case s @ SubtractTimestamps(AnyTimestampTypeExpression(), 
AnyTimestampTypeExpression(), _, _)
 if s.left.dataType != s.right.dataType =>
 val newLeft = castIfNotSameType(s.left, TimestampNTZType)
 val newRight = castIfNotSameType(s.right, 

[spark] branch master updated: [SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python

2023-06-29 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new de8ec74f282 [SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, 
cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python
de8ec74f282 is described below

commit de8ec74f2826db3815275b4fccef186f22c85833
Author: Tengfei Huang 
AuthorDate: Thu Jun 29 22:35:31 2023 -0700

[SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, 
count_min_sketch,mask,named_struct,json_* to Scala and Python

### What changes were proposed in this pull request?

Add following functions:
- array_agg
- array_size
- cardinality
- count_min_sketch
- named_struct
- json_array_length
- json_object_keys
- mask

To:

- Scala API
- Python API
- Spark Connect Scala Client
- Spark Connect Python Client

### Why are the changes needed?
Add Scala, Python and Connect API for these sql functions: array_agg, 
array_size, cardinality, count_min_sketch, named_struct, json_array_length, 
json_object_keys, mask

### Does this PR introduce _any_ user-facing change?
Yes, added new functions.

### How was this patch tested?
New UT added.

Closes #41718 from ivoson/SPARK-43926.

Lead-authored-by: Tengfei Huang 
Co-authored-by: Tengfei Huang 
Signed-off-by: Ruifeng Zheng 
---
 .../scala/org/apache/spark/sql/functions.scala | 159 +
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  48 
 .../explain-results/function_array_agg.explain |   2 +
 .../explain-results/function_array_size.explain|   2 +
 .../explain-results/function_cardinality.explain   |   2 +
 .../function_count_min_sketch.explain  |   2 +
 .../function_json_array_length.explain |   2 +
 .../function_json_object_keys.explain  |   2 +
 .../explain-results/function_mask.explain  |   2 +
 .../function_mask_with_specific_upperChar.explain  |   2 +
 ..._mask_with_specific_upperChar_lowerChar.explain |   2 +
 ..._specific_upperChar_lowerChar_digitChar.explain |   2 +
 ...upperChar_lowerChar_digitChar_otherChar.explain |   2 +
 .../explain-results/function_named_struct.explain  |   2 +
 .../query-tests/queries/function_array_agg.json|  25 ++
 .../queries/function_array_agg.proto.bin   | Bin 0 -> 178 bytes
 .../query-tests/queries/function_array_size.json   |  25 ++
 .../queries/function_array_size.proto.bin  | Bin 0 -> 179 bytes
 .../query-tests/queries/function_cardinality.json  |  25 ++
 .../queries/function_cardinality.proto.bin | Bin 0 -> 180 bytes
 .../queries/function_count_min_sketch.json |  37 +++
 .../queries/function_count_min_sketch.proto.bin| Bin 0 -> 217 bytes
 .../queries/function_json_array_length.json|  25 ++
 .../queries/function_json_array_length.proto.bin   | Bin 0 -> 186 bytes
 .../queries/function_json_object_keys.json |  25 ++
 .../queries/function_json_object_keys.proto.bin| Bin 0 -> 185 bytes
 .../query-tests/queries/function_mask.json |  25 ++
 .../query-tests/queries/function_mask.proto.bin| Bin 0 -> 173 bytes
 .../function_mask_with_specific_upperChar.json |  29 +++
 ...function_mask_with_specific_upperChar.proto.bin | Bin 0 -> 180 bytes
 ...ion_mask_with_specific_upperChar_lowerChar.json |  33 +++
 ...ask_with_specific_upperChar_lowerChar.proto.bin | Bin 0 -> 187 bytes
 ...ith_specific_upperChar_lowerChar_digitChar.json |  37 +++
 ...pecific_upperChar_lowerChar_digitChar.proto.bin | Bin 0 -> 194 bytes
 ...ic_upperChar_lowerChar_digitChar_otherChar.json |  41 
 ...perChar_lowerChar_digitChar_otherChar.proto.bin | Bin 0 -> 201 bytes
 .../query-tests/queries/function_named_struct.json |  37 +++
 .../queries/function_named_struct.proto.bin| Bin 0 -> 203 bytes
 .../source/reference/pyspark.sql/functions.rst |   8 +
 python/pyspark/sql/connect/functions.py|  74 ++
 python/pyspark/sql/functions.py| 254 +
 .../scala/org/apache/spark/sql/functions.scala | 180 +++
 .../spark/sql/CountMinSketchAggQuerySuite.scala|  19 ++
 .../apache/spark/sql/DataFrameAggregateSuite.scala |  12 +
 .../apache/spark/sql/DataFrameFunctionsSuite.scala |  85 ++-
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  |  21 ++
 46 files changed, 1247 insertions(+), 1 deletion(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index ed0c13b2145..2c1a966dc71 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ 

[spark] branch master updated: [SPARK-44150][PYTHON][FOLLOW-UP] Revert commits

2023-06-29 Thread xinrong
This is an automated email from the ASF dual-hosted git repository.

xinrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e505244460b [SPARK-44150][PYTHON][FOLLOW-UP] Revert commits
e505244460b is described below

commit e505244460baa49f862d36333792c9d924cb4dde
Author: Xinrong Meng 
AuthorDate: Thu Jun 29 14:55:03 2023 -0700

[SPARK-44150][PYTHON][FOLLOW-UP] Revert commits

### What changes were proposed in this pull request?
Revert two commits of [SPARK-44150] that block master CI.

### Why are the changes needed?
N/A

### Does this PR introduce _any_ user-facing change?
N/A

### How was this patch tested?
N/A

Closes #41799 from xinrong-meng/revert.

Authored-by: Xinrong Meng 
Signed-off-by: Xinrong Meng 
---
 python/pyspark/sql/pandas/serializers.py  | 32 +++
 python/pyspark/sql/tests/test_arrow_python_udf.py | 39 ---
 python/pyspark/worker.py  |  3 --
 3 files changed, 5 insertions(+), 69 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 12d4c3077fe..307fcc33752 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -190,7 +190,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 )
 return converter(s)
 
-def _create_array(self, series, arrow_type, spark_type=None, 
arrow_cast=False):
+def _create_array(self, series, arrow_type, spark_type=None):
 """
 Create an Arrow Array from the given pandas.Series and optional type.
 
@@ -202,9 +202,6 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 If None, pyarrow's inferred type will be used
 spark_type : DataType, optional
 If None, spark type converted from arrow_type will be used
-arrow_cast: bool, optional
-Whether to apply Arrow casting when the user-specified return type 
mismatches the
-actual return values.
 
 Returns
 ---
@@ -229,14 +226,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 else:
 mask = series.isnull()
 try:
-if arrow_cast:
-return pa.Array.from_pandas(series, mask=mask).cast(
-target_type=arrow_type, safe=self._safecheck
-)
-else:
-return pa.Array.from_pandas(
-series, mask=mask, type=arrow_type, safe=self._safecheck
-)
+return pa.Array.from_pandas(series, mask=mask, type=arrow_type, 
safe=self._safecheck)
 except TypeError as e:
 error_msg = (
 "Exception thrown when converting pandas.Series (%s) "
@@ -329,14 +319,12 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
 df_for_struct=False,
 struct_in_pandas="dict",
 ndarray_as_list=False,
-arrow_cast=False,
 ):
 super(ArrowStreamPandasUDFSerializer, self).__init__(timezone, 
safecheck)
 self._assign_cols_by_name = assign_cols_by_name
 self._df_for_struct = df_for_struct
 self._struct_in_pandas = struct_in_pandas
 self._ndarray_as_list = ndarray_as_list
-self._arrow_cast = arrow_cast
 
 def arrow_to_pandas(self, arrow_column):
 import pyarrow.types as types
@@ -398,13 +386,7 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
 # Assign result columns by schema name if user labeled with 
strings
 elif self._assign_cols_by_name and any(isinstance(name, str) 
for name in s.columns):
 arrs_names = [
-(
-self._create_array(
-s[field.name], field.type, 
arrow_cast=self._arrow_cast
-),
-field.name,
-)
-for field in t
+(self._create_array(s[field.name], field.type), 
field.name) for field in t
 ]
 # Assign result columns by  position
 else:
@@ -412,11 +394,7 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
 # the selected series has name '1', so we rename it to 
field.name
 # as the name is used by _create_array to provide a 
meaningful error message
 (
-self._create_array(
-s[s.columns[i]].rename(field.name),
-field.type,
-arrow_cast=self._arrow_cast,
-),
+  

[spark] branch master updated: [SPARK-44248][SS][SQL][KAFKA] Add preferred location in kafka source v2

2023-06-29 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7142e04639c [SPARK-44248][SS][SQL][KAFKA] Add preferred location in 
kafka source v2
7142e04639c is described below

commit 7142e04639c1481e41ad499e657b2c62120fe763
Author: Siying Dong 
AuthorDate: Fri Jun 30 06:50:24 2023 +0900

[SPARK-44248][SS][SQL][KAFKA] Add preferred location in kafka source v2

### What changes were proposed in this pull request?
In KafkaBatchInputPartition, which is used for Kafka v2 source, 
preferredLocations() is now returned from the location already pre-calculated.

### Why are the changes needed?
DSv2 Kafka streaming source seems to miss setting the preferred location, 
which may destroy the purpose of cache for Kafka consumer (connection) & 
fetched data. For DSv1, we have set the preferred location in RDD. This 
information is not returned in DSv2.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Some manual verification.

Closes #41790 from siying/kafkav2loc.

Authored-by: Siying Dong 
Signed-off-by: Jungtaek Lim 
---
 .../org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala   | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
index 508f5c7036b..97c8592d1da 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
@@ -34,7 +34,11 @@ private[kafka010] case class KafkaBatchInputPartition(
 executorKafkaParams: ju.Map[String, Object],
 pollTimeoutMs: Long,
 failOnDataLoss: Boolean,
-includeHeaders: Boolean) extends InputPartition
+includeHeaders: Boolean) extends InputPartition {
+  override def preferredLocations(): Array[String] = {
+offsetRange.preferredLoc.map(Array(_)).getOrElse(Array())
+  }
+}
 
 private[kafka010] object KafkaBatchReaderFactory extends 
PartitionReaderFactory with Logging {
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (6e56cfeaca8 -> 414bc75ac5b)

2023-06-29 Thread xinrong
This is an automated email from the ASF dual-hosted git repository.

xinrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 6e56cfeaca8 [SPARK-44150][PYTHON][CONNECT] Explicit Arrow casting for 
mismatched return type in Arrow Python UDF
 add 414bc75ac5b [SPARK-44150][PYTHON][FOLLOW-UP] Fix 
ArrowStreamPandasSerializer to set arguments properly

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/pandas/serializers.py | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44150][PYTHON][CONNECT] Explicit Arrow casting for mismatched return type in Arrow Python UDF

2023-06-29 Thread xinrong
This is an automated email from the ASF dual-hosted git repository.

xinrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6e56cfeaca8 [SPARK-44150][PYTHON][CONNECT] Explicit Arrow casting for 
mismatched return type in Arrow Python UDF
6e56cfeaca8 is described below

commit 6e56cfeaca884b1ccfaa8524c70f12f118bc840c
Author: Xinrong Meng 
AuthorDate: Thu Jun 29 11:46:06 2023 -0700

[SPARK-44150][PYTHON][CONNECT] Explicit Arrow casting for mismatched return 
type in Arrow Python UDF

### What changes were proposed in this pull request?
Explicit Arrow casting for the mismatched return type of Arrow Python UDF.

### Why are the changes needed?
A more standardized and coherent type coercion.

Please refer to https://github.com/apache/spark/pull/41706 for a 
comprehensive comparison between type coercion rules of Arrow and Pickle(used 
by the default Python UDF) separately.

See more at [[Design] Type-coercion in Arrow Python 
UDFs](https://docs.google.com/document/d/e/2PACX-1vTEGElOZfhl9NfgbBw4CTrlm-8F_xQCAKNOXouz-7mg5vYobS7lCGUsGkDZxPY0wV5YkgoZmkYlxccU/pub).

### Does this PR introduce _any_ user-facing change?
Yes.

FROM
```py
>>> df = spark.createDataFrame(['1', '2'], schema='string')
df.select(pandas_udf(lambda x: x, 'int')('value')).show()
>>> df.select(pandas_udf(lambda x: x, 'int')('value')).show()
...
org.apache.spark.api.python.PythonException: Traceback (most recent call 
last):
...
pyarrow.lib.ArrowInvalid: Could not convert '1' with type str: tried to 
convert to int32
```

TO
```py
>>> df = spark.createDataFrame(['1', '2'], schema='string')
>>> df.select(pandas_udf(lambda x: x, 'int')('value')).show()
+---+
|(value)|
+---+
|  1|
|  2|
+---+
```
### How was this patch tested?
Unit tests.

Closes #41503 from xinrong-meng/type_coersion.

Authored-by: Xinrong Meng 
Signed-off-by: Xinrong Meng 
---
 python/pyspark/sql/pandas/serializers.py  | 30 ++---
 python/pyspark/sql/tests/test_arrow_python_udf.py | 39 +++
 python/pyspark/worker.py  |  3 ++
 3 files changed, 67 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index 307fcc33752..a99eda9cbea 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -190,7 +190,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 )
 return converter(s)
 
-def _create_array(self, series, arrow_type, spark_type=None):
+def _create_array(self, series, arrow_type, spark_type=None, 
arrow_cast=False):
 """
 Create an Arrow Array from the given pandas.Series and optional type.
 
@@ -202,6 +202,9 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 If None, pyarrow's inferred type will be used
 spark_type : DataType, optional
 If None, spark type converted from arrow_type will be used
+arrow_cast: bool, optional
+Whether to apply Arrow casting when the user-specified return type 
mismatches the
+actual return values.
 
 Returns
 ---
@@ -226,7 +229,12 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
 else:
 mask = series.isnull()
 try:
-return pa.Array.from_pandas(series, mask=mask, type=arrow_type, 
safe=self._safecheck)
+if arrow_cast:
+return pa.Array.from_pandas(series, mask=mask, 
type=arrow_type).cast(
+target_type=arrow_type, safe=self._safecheck
+)
+else:
+return pa.Array.from_pandas(series, mask=mask, 
safe=self._safecheck)
 except TypeError as e:
 error_msg = (
 "Exception thrown when converting pandas.Series (%s) "
@@ -319,12 +327,14 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
 df_for_struct=False,
 struct_in_pandas="dict",
 ndarray_as_list=False,
+arrow_cast=False,
 ):
 super(ArrowStreamPandasUDFSerializer, self).__init__(timezone, 
safecheck)
 self._assign_cols_by_name = assign_cols_by_name
 self._df_for_struct = df_for_struct
 self._struct_in_pandas = struct_in_pandas
 self._ndarray_as_list = ndarray_as_list
+self._arrow_cast = arrow_cast
 
 def arrow_to_pandas(self, arrow_column):
 import pyarrow.types as types
@@ -386,7 +396,13 @@ class 
ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
 # Assign result columns by schema name if user labeled with 

[spark] branch master updated: [SPARK-43474][SS][CONNECT] Add a spark connect access to runtime Dataframes by ID

2023-06-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c9734008401 [SPARK-43474][SS][CONNECT] Add a spark connect access to 
runtime Dataframes by ID
c9734008401 is described below

commit c9734008401ce7adfb154cda5496c808b2d76580
Author: Raghu Angadi 
AuthorDate: Thu Jun 29 09:25:15 2023 -0700

[SPARK-43474][SS][CONNECT] Add a spark connect access to runtime Dataframes 
by ID

[This is a continuation of #41146, to change the author of the PR. Retains 
the description.]

### What changes were proposed in this pull request?

This change adds a new spark connect relation type `CachedRemoteRelation`, 
which can represent a DataFrame that's been cached on the server side.

On the server side, each `SessionHolder` has a cache to maintain mapping 
from Dataframe ID to actual dataframe.

On the client side, a new relation type and function is added. The new 
function can create a DataFrame reference given a key. The key is the id of a 
cached DataFrame, which is usually passed from server to the client. When 
transforming the DataFrame reference, the server finds the actual DataFrame 
from the cache and replace it.

One use case of this function will be streaming foreachBatch(). Server 
needs to call user function for every batch which takes a DataFrame as 
argument. With the new function, we can cache the DataFrame on the server. Pass 
the id back to client which can creates the DataFrame reference.

### Why are the changes needed?

This change is needed to support streaming foreachBatch() in Spark Connect.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Scala unit test.
Manual test.
(More end to end test will be added when foreachBatch() is supported. 
Currently there is no way to add a dataframe to the server cache using Python.)

Closes #41580 from rangadi/df-ref.

Authored-by: Raghu Angadi 
Signed-off-by: Hyukjin Kwon 
---
 .../main/protobuf/spark/connect/relations.proto|   7 +
 .../sql/connect/planner/SparkConnectPlanner.scala  |   8 +
 .../spark/sql/connect/service/SessionHolder.scala  |  32 +++
 .../service/SparkConnectSessionHodlerSuite.scala   |  82 +++
 python/pyspark/sql/connect/plan.py |  14 ++
 python/pyspark/sql/connect/proto/relations_pb2.py  | 270 +++--
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  28 +++
 7 files changed, 313 insertions(+), 128 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index d29ab02f86a..29405a1332b 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -70,6 +70,7 @@ message Relation {
 ApplyInPandasWithState apply_in_pandas_with_state = 34;
 HtmlString html_string = 35;
 CachedLocalRelation cached_local_relation = 36;
+CachedRemoteRelation cached_remote_relation = 37;
 
 // NA functions
 NAFill fill_na = 90;
@@ -398,6 +399,12 @@ message CachedLocalRelation {
   string hash = 3;
 }
 
+// Represents a remote relation that has been cached on server.
+message CachedRemoteRelation {
+  // (Required) ID of the remote related (assigned by the service).
+  string relation_id = 1;
+}
+
 // Relation of type [[Sample]] that samples a fraction of the dataset.
 message Sample {
   // (Required) Input relation for a Sample.
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index cecf14a7045..cdad4fc6190 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -162,6 +162,8 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) 
extends Logging {
 transformCoGroupMap(rel.getCoGroupMap)
   case proto.Relation.RelTypeCase.APPLY_IN_PANDAS_WITH_STATE =>
 transformApplyInPandasWithState(rel.getApplyInPandasWithState)
+  case proto.Relation.RelTypeCase.CACHED_REMOTE_RELATION =>
+transformCachedRemoteRelation(rel.getCachedRemoteRelation)
   case proto.Relation.RelTypeCase.COLLECT_METRICS =>
 transformCollectMetrics(rel.getCollectMetrics)
   case proto.Relation.RelTypeCase.PARSE => transformParse(rel.getParse)
@@ -897,6 +899,12 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
   .logicalPlan
   }
 
+ 

[spark] branch master updated: [SPARK-44227][SQL] Extract SchemaUtils from StructField

2023-06-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0ae86c0492c [SPARK-44227][SQL] Extract SchemaUtils from StructField
0ae86c0492c is described below

commit 0ae86c0492cdeac6240ea8283f5bd93a8bf6e7a8
Author: Rui Wang 
AuthorDate: Thu Jun 29 09:03:35 2023 -0700

[SPARK-44227][SQL] Extract SchemaUtils from StructField

### What changes were proposed in this pull request?

Extract SchemaUtils from StructField so that StructField does not requires 
anything from Catalyst.

### Why are the changes needed?

To help StructField get rid of the need to depend on Catalyst.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #41772 from amaliujia/move_schema_utils.

Authored-by: Rui Wang 
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/util/SparkSchemaUtils.scala   | 36 ++
 .../org/apache/spark/sql/types/StructField.scala   |  4 +--
 .../org/apache/spark/sql/util/SchemaUtils.scala| 11 ++-
 3 files changed, 40 insertions(+), 11 deletions(-)

diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkSchemaUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkSchemaUtils.scala
new file mode 100644
index 000..efb833f0a2b
--- /dev/null
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkSchemaUtils.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+/**
+ * Utils for handling schemas.
+ */
+private[spark] object SparkSchemaUtils {
+  /**
+   * @param str The string to be escaped.
+   * @return The escaped string.
+   */
+  def escapeMetaCharacters(str: String): String = {
+str.replaceAll("\n", "n")
+  .replaceAll("\r", "r")
+  .replaceAll("\t", "t")
+  .replaceAll("\f", "f")
+  .replaceAll("\b", "b")
+  .replaceAll("\u000B", "v")
+  .replaceAll("\u0007", "a")
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
index e03b2e8ab3c..dd267ed763e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
@@ -24,7 +24,7 @@ import org.apache.spark.annotation.Stable
 import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, 
quoteIfNeeded}
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.catalyst.util.StringConcat
-import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.util.SparkSchemaUtils
 
 /**
  * A field inside a StructType.
@@ -51,7 +51,7 @@ case class StructField(
   stringConcat: StringConcat,
   maxDepth: Int): Unit = {
 if (maxDepth > 0) {
-  stringConcat.append(s"$prefix-- 
${SchemaUtils.escapeMetaCharacters(name)}: " +
+  stringConcat.append(s"$prefix-- 
${SparkSchemaUtils.escapeMetaCharacters(name)}: " +
 s"${dataType.typeName} (nullable = $nullable)\n")
   DataType.buildFormattedString(dataType, s"$prefix|", stringConcat, 
maxDepth)
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
index d202900381a..4b0e5308e65 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute, NamedExpress
 import org.apache.spark.sql.connector.expressions.{BucketTransform, 
FieldReference, NamedTransform, Transform}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, 
StructType}
+import 

[spark] branch master updated: [SPARK-44208][CORE][SQL] Assign clear error class names for some logic that directly uses exceptions

2023-06-29 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a9129defc0e [SPARK-44208][CORE][SQL] Assign clear error class names 
for some logic that directly uses exceptions
a9129defc0e is described below

commit a9129defc0ebbe68f20ec888352c30a90925d7ea
Author: panbingkun 
AuthorDate: Thu Jun 29 17:31:03 2023 +0300

[SPARK-44208][CORE][SQL] Assign clear error class names for some logic that 
directly uses exceptions

### What changes were proposed in this pull request?
The pr aims to assign clear error class names for some logic that directly 
uses exceptions, include:
- ALL_PARTITION_COLUMNS_NOT_ALLOWED
- INVALID_HIVE_COLUMN_NAME
- SPECIFY_BUCKETING_IS_NOT_ALLOWED
- SPECIFY_PARTITION_IS_NOT_ALLOWED
- UNSUPPORTED_ADD_FILE.DIRECTORY
- UNSUPPORTED_ADD_FILE.LOCAL_DIRECTORY

### Why are the changes needed?
The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Update UT.
- Pass GA.

Closes #41740 from panbingkun/assign_new_name.

Lead-authored-by: panbingkun 
Co-authored-by: panbingkun <84731...@qq.com>
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json| 42 +++---
 .../main/scala/org/apache/spark/SparkContext.scala |  7 ++--
 .../org/apache/spark/errors/SparkCoreErrors.scala  | 14 
 .../spark/sql/errors/QueryCompilationErrors.scala  |  2 +-
 .../spark/sql/execution/datasources/rules.scala| 16 +
 .../spark/sql/execution/command/DDLSuite.scala | 34 +-
 .../spark/sql/hive/HiveExternalCatalog.scala   | 12 ---
 .../spark/sql/hive/execution/HiveDDLSuite.scala| 12 +++
 8 files changed, 97 insertions(+), 42 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 192a0747dfd..6db8c5e3bf1 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -4,6 +4,11 @@
   "Non-deterministic expression  should not appear in the 
arguments of an aggregate function."
 ]
   },
+  "ALL_PARTITION_COLUMNS_NOT_ALLOWED" : {
+"message" : [
+  "Cannot use all columns for partition columns."
+]
+  },
   "ALTER_TABLE_COLUMN_DESCRIPTOR_DUPLICATE" : {
 "message" : [
   "ALTER TABLE  column  specifies descriptor 
\"\" more than once, which is invalid."
@@ -1180,6 +1185,11 @@
 ],
 "sqlState" : "22023"
   },
+  "INVALID_HIVE_COLUMN_NAME" : {
+"message" : [
+  "Cannot create the table  having the nested column 
 whose name contains invalid characters  in Hive 
metastore."
+]
+  },
   "INVALID_IDENTIFIER" : {
 "message" : [
   "The identifier  is invalid. Please, consider quoting it with 
back-quotes as ``."
@@ -2081,6 +2091,16 @@
   "sortBy must be used together with bucketBy."
 ]
   },
+  "SPECIFY_BUCKETING_IS_NOT_ALLOWED" : {
+"message" : [
+  "Cannot specify bucketing information if the table schema is not 
specified when creating and will be inferred at runtime."
+]
+  },
+  "SPECIFY_PARTITION_IS_NOT_ALLOWED" : {
+"message" : [
+  "It is not allowed to specify partition columns when the table schema is 
not defined. When the table schema is not provided, schema and partition 
columns will be inferred."
+]
+  },
   "SQL_CONF_NOT_FOUND" : {
 "message" : [
   "The SQL config  cannot be found. Please verify that the config 
exists."
@@ -2303,6 +2323,23 @@
   "Attempted to unset non-existent properties [] in table 
."
 ]
   },
+  "UNSUPPORTED_ADD_FILE" : {
+"message" : [
+  "Don't support add file."
+],
+"subClass" : {
+  "DIRECTORY" : {
+"message" : [
+  "The file  is a directory, consider to set 
\"spark.sql.legacy.addSingleFileInAddFile\" to \"false\"."
+]
+  },
+  "LOCAL_DIRECTORY" : {
+"message" : [
+  "The local directory  is not supported in a non-local master 
mode."
+]
+  }
+}
+  },
   "UNSUPPORTED_ARROWTYPE" : {
 "message" : [
   "Unsupported arrow type ."
@@ -3588,11 +3625,6 @@
   "Cannot use  for partition column."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1154" : {
-"message" : [
-  "Cannot use all columns for partition columns."
-]
-  },
   "_LEGACY_ERROR_TEMP_1155" : {
 "message" : [
   "Partition column `` not found in schema ."
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 24d788ff5bc..78c7ecb2782 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ 

[spark] branch branch-3.4 updated: [SPARK-44079][SQL][3.4] Fix `ArrayIndexOutOfBoundsException` when parse array as struct using PERMISSIVE mode with corrupt record

2023-06-29 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new ad29290a02f [SPARK-44079][SQL][3.4] Fix 
`ArrayIndexOutOfBoundsException` when parse array as struct using PERMISSIVE 
mode with corrupt record
ad29290a02f is described below

commit ad29290a02fb94a958fd21e301100338c9f5b82a
Author: Jia Fan 
AuthorDate: Thu Jun 29 16:38:02 2023 +0300

[SPARK-44079][SQL][3.4] Fix `ArrayIndexOutOfBoundsException` when parse 
array as struct using PERMISSIVE mode with corrupt record

### What changes were proposed in this pull request?
cherry pick #41662 , fix  parse array as struct bug on branch 3.4
### Why are the changes needed?
Fix the bug when parse array as struct using PERMISSIVE mode with corrupt 
record

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test.

Closes #41784 from Hisoka-X/SPARK-44079_3.4_cherry_pick.

Authored-by: Jia Fan 
Signed-off-by: Max Gekk 
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala |  4 ++--
 .../spark/sql/catalyst/json/JacksonParser.scala  | 20 +++-
 .../spark/sql/catalyst/util/BadRecordException.scala | 14 --
 .../spark/sql/catalyst/util/FailureSafeParser.scala  |  9 +++--
 .../sql/execution/datasources/json/JsonSuite.scala   | 15 +++
 5 files changed, 51 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 42e03630b14..b58649da61c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -318,7 +318,7 @@ class UnivocityParser(
 if (tokens == null) {
   throw BadRecordException(
 () => getCurrentInput,
-() => None,
+() => Array.empty,
 QueryExecutionErrors.malformedCSVRecordError(""))
 }
 
@@ -362,7 +362,7 @@ class UnivocityParser(
 } else {
   if (badRecordException.isDefined) {
 throw BadRecordException(
-  () => currentInput, () => requiredRow.headOption, 
badRecordException.get)
+  () => currentInput, () => Array(requiredRow.get), 
badRecordException.get)
   } else {
 requiredRow
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index bf07d65caa0..d9bff3dc7ec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -135,7 +135,7 @@ class JacksonParser(
 // List([str_a_2,null], [null,str_b_3])
 //
   case START_ARRAY if allowArrayAsStructs =>
-val array = convertArray(parser, elementConverter, isRoot = true)
+val array = convertArray(parser, elementConverter, isRoot = true, 
arrayAsStructs = true)
 // Here, as we support reading top level JSON arrays and take every 
element
 // in such an array as a row, this case is possible.
 if (array.numElements() == 0) {
@@ -517,7 +517,8 @@ class JacksonParser(
   private def convertArray(
   parser: JsonParser,
   fieldConverter: ValueConverter,
-  isRoot: Boolean = false): ArrayData = {
+  isRoot: Boolean = false,
+  arrayAsStructs: Boolean = false): ArrayData = {
 val values = ArrayBuffer.empty[Any]
 var badRecordException: Option[Throwable] = None
 
@@ -537,6 +538,9 @@ class JacksonParser(
 
 if (badRecordException.isEmpty) {
   arrayData
+} else if (arrayAsStructs) {
+  throw PartialResultArrayException(arrayData.toArray[InternalRow](schema),
+badRecordException.get)
 } else {
   throw PartialResultException(InternalRow(arrayData), 
badRecordException.get)
 }
@@ -570,7 +574,7 @@ class JacksonParser(
 // JSON parser currently doesn't support partial results for corrupted 
records.
 // For such records, all fields other than the field configured by
 // `columnNameOfCorruptRecord` are set to `null`.
-throw BadRecordException(() => recordLiteral(record), () => None, e)
+throw BadRecordException(() => recordLiteral(record), () => 
Array.empty, e)
   case e: CharConversionException if options.encoding.isEmpty =>
 val msg =
   """JSON parser cannot handle a character in its input.
@@ -578,11 +582,17 @@ class JacksonParser(
 |""".stripMargin + e.getMessage
 val wrappedCharException = new CharConversionException(msg)

[spark] branch master updated: [MINOR][TESTS] Fix potential bug for AlterTableTest

2023-06-29 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6511a3e9020 [MINOR][TESTS] Fix potential bug for AlterTableTest
6511a3e9020 is described below

commit 6511a3e90206473985c2d6fd28d06eb7bcf8c98f
Author: panbingkun 
AuthorDate: Thu Jun 29 12:28:03 2023 +0300

[MINOR][TESTS] Fix potential bug for AlterTableTest

### What changes were proposed in this pull request?
The pr aims to fix potential bug for `AlterTableTest`.

### Why are the changes needed?
Fix bug.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Manually test.
- Pass GA.

Closes #41783 from panbingkun/AlterTableTests_fix.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 .../spark/sql/connector/AlterTableTests.scala  | 373 +
 1 file changed, 164 insertions(+), 209 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 2047212a4ea..122b3ab07e6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -42,7 +42,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
 if (catalogAndNamespace.isEmpty) {
   s"default.$tableName"
 } else {
-  s"${catalogAndNamespace}table_name"
+  s"$catalogAndNamespace$tableName"
 }
   }
 
@@ -63,7 +63,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
   }
 
   test("AlterTable: change rejected by implementation") {
-val t = s"${catalogAndNamespace}table_name"
+val t = fullTableName("table_name")
 withTable(t) {
   sql(s"CREATE TABLE $t (id int) USING $v2Format")
 
@@ -74,38 +74,35 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
   assert(exc.getMessage.contains("Unsupported table change"))
   assert(exc.getMessage.contains("Cannot drop all fields")) // from the 
implementation
 
-  val tableName = fullTableName(t)
-  val table = getTableMetadata(tableName)
+  val table = getTableMetadata(t)
 
-  assert(table.name === tableName)
+  assert(table.name === t)
   assert(table.schema === new StructType().add("id", IntegerType))
 }
   }
 
   test("AlterTable: add top-level column") {
-val t = s"${catalogAndNamespace}table_name"
+val t = fullTableName("table_name")
 withTable(t) {
   sql(s"CREATE TABLE $t (id int) USING $v2Format")
   sql(s"ALTER TABLE $t ADD COLUMN data string")
 
-  val tableName = fullTableName(t)
-  val table = getTableMetadata(tableName)
+  val table = getTableMetadata(t)
 
-  assert(table.name === tableName)
+  assert(table.name === t)
   assert(table.schema === new StructType().add("id", 
IntegerType).add("data", StringType))
 }
   }
 
   test("AlterTable: add column with NOT NULL") {
-val t = s"${catalogAndNamespace}table_name"
+val t = fullTableName("table_name")
 withTable(t) {
   sql(s"CREATE TABLE $t (id int) USING $v2Format")
   sql(s"ALTER TABLE $t ADD COLUMN data string NOT NULL")
 
-  val tableName = fullTableName(t)
-  val table = getTableMetadata(tableName)
+  val table = getTableMetadata(t)
 
-  assert(table.name === tableName)
+  assert(table.name === t)
   assert(table.schema === StructType(Seq(
 StructField("id", IntegerType),
 StructField("data", StringType, nullable = false
@@ -113,15 +110,14 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
   }
 
   test("AlterTable: add column with comment") {
-val t = s"${catalogAndNamespace}table_name"
+val t = fullTableName("table_name")
 withTable(t) {
   sql(s"CREATE TABLE $t (id int) USING $v2Format")
   sql(s"ALTER TABLE $t ADD COLUMN data string COMMENT 'doc'")
 
-  val tableName = fullTableName(t)
-  val table = getTableMetadata(tableName)
+  val table = getTableMetadata(t)
 
-  assert(table.name === tableName)
+  assert(table.name === t)
   assert(table.schema === StructType(Seq(
 StructField("id", IntegerType),
 StructField("data", StringType).withComment("doc"
@@ -129,7 +125,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
   }
 
   test("AlterTable: add column with interval type") {
-val t = s"${catalogAndNamespace}table_name"
+val t = fullTableName("table_name")
 withTable(t) {
   sql(s"CREATE TABLE $t (id int, point struct) USING 
$v2Format")
   val e1 =
@@ -142,18 +138,17 @@ trait AlterTableTests extends SharedSparkSession with 

[spark] branch master updated: [SPARK-44169][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2300-2304]

2023-06-29 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ffbd1a3b5b1 [SPARK-44169][SQL] Assign names to the error class 
_LEGACY_ERROR_TEMP_[2300-2304]
ffbd1a3b5b1 is described below

commit ffbd1a3b5b17386759a378dee5ef5cf6df7f2d09
Author: Jiaan Geng 
AuthorDate: Thu Jun 29 12:26:24 2023 +0300

[SPARK-44169][SQL] Assign names to the error class 
_LEGACY_ERROR_TEMP_[2300-2304]

### What changes were proposed in this pull request?
The pr aims to assign names to the error class 
_LEGACY_ERROR_TEMP_[2300-2304].

### Why are the changes needed?
Improve the error framework.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Exists test cases updated and added new test cases.

Closes #41719 from beliefer/SPARK-44169.

Authored-by: Jiaan Geng 
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json|  74 ---
 .../catalyst/analysis/ResolveInlineTables.scala|  12 +-
 .../catalyst/analysis/higherOrderFunctions.scala   |  14 +-
 .../analysis/ResolveLambdaVariablesSuite.scala |  18 +-
 .../spark/sql/execution/datasources/rules.scala|   4 +-
 .../sql-tests/analyzer-results/cte.sql.out |   4 +-
 .../analyzer-results/inline-table.sql.out  |  12 +-
 .../analyzer-results/postgreSQL/boolean.sql.out|   2 +-
 .../postgreSQL/window_part3.sql.out|   2 +-
 .../postgreSQL/window_part4.sql.out|   2 +-
 .../analyzer-results/udf/udf-inline-table.sql.out  |  12 +-
 .../test/resources/sql-tests/results/cte.sql.out   |   4 +-
 .../sql-tests/results/inline-table.sql.out |  12 +-
 .../sql-tests/results/postgreSQL/boolean.sql.out   |   2 +-
 .../results/postgreSQL/window_part3.sql.out|   2 +-
 .../results/postgreSQL/window_part4.sql.out|   2 +-
 .../sql-tests/results/udf/udf-inline-table.sql.out |  12 +-
 .../apache/spark/sql/ColumnExpressionSuite.scala   |  33 +++-
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 219 +++--
 19 files changed, 297 insertions(+), 145 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index e441686432a..192a0747dfd 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -704,11 +704,6 @@
 ],
 "sqlState" : "42K04"
   },
-  "FAILED_SQL_EXPRESSION_EVALUATION" : {
-"message" : [
-  "Failed to evaluate the SQL expression: . Please check your 
syntax and ensure all required tables and columns are available."
-]
-  },
   "FIELD_NOT_FOUND" : {
 "message" : [
   "No such struct field  in ."
@@ -1197,6 +1192,28 @@
 ],
 "sqlState" : "22003"
   },
+  "INVALID_INLINE_TABLE" : {
+"message" : [
+  "Invalid inline table."
+],
+"subClass" : {
+  "CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE" : {
+"message" : [
+  "Cannot evaluate the expression  in inline table definition."
+]
+  },
+  "FAILED_SQL_EXPRESSION_EVALUATION" : {
+"message" : [
+  "Failed to evaluate the SQL expression . Please check your 
syntax and ensure all required tables and columns are available."
+]
+  },
+  "INCOMPATIBLE_TYPES_IN_INLINE_TABLE" : {
+"message" : [
+  "Found incompatible types in the column  for inline table."
+]
+  }
+}
+  },
   "INVALID_JSON_ROOT_FIELD" : {
 "message" : [
   "Cannot convert JSON root field to target Spark type."
@@ -1209,6 +1226,23 @@
 ],
 "sqlState" : "22032"
   },
+  "INVALID_LAMBDA_FUNCTION_CALL" : {
+"message" : [
+  "Invalid lambda function call."
+],
+"subClass" : {
+  "DUPLICATE_ARG_NAMES" : {
+"message" : [
+  "The lambda function has duplicate arguments . Please, 
consider to rename the argument names or set  to \"true\"."
+]
+  },
+  "NUM_ARGS_MISMATCH" : {
+"message" : [
+  "A higher order function expects  arguments, but 
got ."
+]
+  }
+}
+  },
   "INVALID_LATERAL_JOIN_TYPE" : {
 "message" : [
   "The  JOIN with LATERAL correlation is not allowed because an 
OUTER subquery cannot correlate to its join partner. Remove the LATERAL 
correlation or use an INNER JOIN, or LEFT OUTER JOIN instead."
@@ -1654,6 +1688,11 @@
 ],
 "sqlState" : "42803"
   },
+  "MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION" : {
+"message" : [
+  "The expression  does not support more than one source."
+]
+  },
   "MULTI_UDF_INTERFACE_ERROR" : {
 "message" : [
   "Not allowed to implement multiple UDF interfaces, UDF class 
."
@@ -5492,31 +5531,6 @@
   

[spark-docker] branch master updated: [SPARK-40513] Add --batch to gpg command

2023-06-29 Thread yikun
This is an automated email from the ASF dual-hosted git repository.

yikun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-docker.git


The following commit(s) were added to refs/heads/master by this push:
 new 58d2885  [SPARK-40513] Add --batch to gpg command
58d2885 is described below

commit 58d288546e8419d229f14b62b6a653999e0390f1
Author: Yikun Jiang 
AuthorDate: Thu Jun 29 16:05:47 2023 +0800

[SPARK-40513] Add --batch to gpg command

### What changes were proposed in this pull request?
Add --batch to gpg command which essentially puts GnuPG into "API mode" 
instead of "UI mode".
Apply changes to 3.4.x dockerfile.

### Why are the changes needed?
Address DOI comments: 
https://github.com/docker-library/official-images/pull/13089#issuecomment-1611814491

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
CI passed

Closes #51 from Yikun/batch.

Authored-by: Yikun Jiang 
Signed-off-by: Yikun Jiang 
---
 3.4.0/scala2.12-java11-ubuntu/Dockerfile | 4 ++--
 3.4.1/scala2.12-java11-ubuntu/Dockerfile | 4 ++--
 Dockerfile.template  | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/3.4.0/scala2.12-java11-ubuntu/Dockerfile 
b/3.4.0/scala2.12-java11-ubuntu/Dockerfile
index 854f86c..a4b081e 100644
--- a/3.4.0/scala2.12-java11-ubuntu/Dockerfile
+++ b/3.4.0/scala2.12-java11-ubuntu/Dockerfile
@@ -46,8 +46,8 @@ RUN set -ex; \
 wget -nv -O spark.tgz "$SPARK_TGZ_URL"; \
 wget -nv -O spark.tgz.asc "$SPARK_TGZ_ASC_URL"; \
 export GNUPGHOME="$(mktemp -d)"; \
-gpg --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \
-gpg --keyserver hkps://keyserver.ubuntu.com --recv-keys "$GPG_KEY"; \
+gpg --batch --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \
+gpg --batch --keyserver hkps://keyserver.ubuntu.com --recv-keys 
"$GPG_KEY"; \
 gpg --batch --verify spark.tgz.asc spark.tgz; \
 gpgconf --kill all; \
 rm -rf "$GNUPGHOME" spark.tgz.asc; \
diff --git a/3.4.1/scala2.12-java11-ubuntu/Dockerfile 
b/3.4.1/scala2.12-java11-ubuntu/Dockerfile
index 6d62769..d8bba7e 100644
--- a/3.4.1/scala2.12-java11-ubuntu/Dockerfile
+++ b/3.4.1/scala2.12-java11-ubuntu/Dockerfile
@@ -46,8 +46,8 @@ RUN set -ex; \
 wget -nv -O spark.tgz "$SPARK_TGZ_URL"; \
 wget -nv -O spark.tgz.asc "$SPARK_TGZ_ASC_URL"; \
 export GNUPGHOME="$(mktemp -d)"; \
-gpg --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \
-gpg --keyserver hkps://keyserver.ubuntu.com --recv-keys "$GPG_KEY"; \
+gpg --batch --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \
+gpg --batch --keyserver hkps://keyserver.ubuntu.com --recv-keys 
"$GPG_KEY"; \
 gpg --batch --verify spark.tgz.asc spark.tgz; \
 gpgconf --kill all; \
 rm -rf "$GNUPGHOME" spark.tgz.asc; \
diff --git a/Dockerfile.template b/Dockerfile.template
index 80b57e2..3d0aacf 100644
--- a/Dockerfile.template
+++ b/Dockerfile.template
@@ -46,8 +46,8 @@ RUN set -ex; \
 wget -nv -O spark.tgz "$SPARK_TGZ_URL"; \
 wget -nv -O spark.tgz.asc "$SPARK_TGZ_ASC_URL"; \
 export GNUPGHOME="$(mktemp -d)"; \
-gpg --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \
-gpg --keyserver hkps://keyserver.ubuntu.com --recv-keys "$GPG_KEY"; \
+gpg --batch --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \
+gpg --batch --keyserver hkps://keyserver.ubuntu.com --recv-keys 
"$GPG_KEY"; \
 gpg --batch --verify spark.tgz.asc spark.tgz; \
 gpgconf --kill all; \
 rm -rf "$GNUPGHOME" spark.tgz.asc; \


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark-docker] branch master updated: [SPARK-44168][FOLLOWUP] Change v3.4 GPG_KEY to full key fingerprint

2023-06-29 Thread yikun
This is an automated email from the ASF dual-hosted git repository.

yikun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-docker.git


The following commit(s) were added to refs/heads/master by this push:
 new 39264c5  [SPARK-44168][FOLLOWUP] Change v3.4 GPG_KEY to full key 
fingerprint
39264c5 is described below

commit 39264c502cf21b71a1ab5da71760e5864abce099
Author: Yikun Jiang 
AuthorDate: Thu Jun 29 16:04:50 2023 +0800

[SPARK-44168][FOLLOWUP] Change v3.4 GPG_KEY to full key fingerprint

### What changes were proposed in this pull request?

Change GPG key from `34F0FC5C` to 
`F28C9C925C188C35E345614DEDA00CE834F0FC5C` to avoid pontential collision.

The full finger print can get from below cmd:
```
$ wget https://dist.apache.org/repos/dist/dev/spark/KEYS
$ gpg --import KEYS
$ gpg --fingerprint 34F0FC5C

pub   rsa4096 2015-05-05 [SC]
  F28C 9C92 5C18 8C35 E345  614D EDA0 0CE8 34F0 FC5C
uid   [ unknown] Dongjoon Hyun (CODE SIGNING KEY) 

sub   rsa4096 2015-05-05 [E]

```

### Why are the changes needed?

- A short gpg key had been added as v3.4.0 gpg key in 
https://github.com/apache/spark-docker/pull/46 .
- The short key `34F0FC5C` is from 
https://dist.apache.org/repos/dist/dev/spark/KEYS
- According DOI review comments, 
https://github.com/docker-library/official-images/pull/13089#issuecomment-1609990551
 , `this should be the full key fingerprint: 
F28C9C925C188C35E345614DEDA00CE834F0FC5C (generating a collision for such a 
short key ID is trivial.`
- We'd better to switch the short key to full fingerprint

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
CI passed

Closes #50 from Yikun/gpg_key.

Authored-by: Yikun Jiang 
Signed-off-by: Yikun Jiang 
---
 3.4.1/scala2.12-java11-ubuntu/Dockerfile | 2 +-
 tools/template.py| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/3.4.1/scala2.12-java11-ubuntu/Dockerfile 
b/3.4.1/scala2.12-java11-ubuntu/Dockerfile
index bf106a6..6d62769 100644
--- a/3.4.1/scala2.12-java11-ubuntu/Dockerfile
+++ b/3.4.1/scala2.12-java11-ubuntu/Dockerfile
@@ -38,7 +38,7 @@ RUN set -ex; \
 # https://downloads.apache.org/spark/KEYS
 ENV 
SPARK_TGZ_URL=https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
 \
 
SPARK_TGZ_ASC_URL=https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz.asc
 \
-GPG_KEY=34F0FC5C
+GPG_KEY=F28C9C925C188C35E345614DEDA00CE834F0FC5C
 
 RUN set -ex; \
 export SPARK_TMP="$(mktemp -d)"; \
diff --git a/tools/template.py b/tools/template.py
index 93e842a..cdc167c 100755
--- a/tools/template.py
+++ b/tools/template.py
@@ -31,7 +31,7 @@ GPG_KEY_DICT = {
 # issuer "xinr...@apache.org"
 "3.4.0": "CC68B3D16FE33A766705160BA7E57908C7A4E1B1",
 # issuer "dongj...@apache.org"
-"3.4.1": "34F0FC5C"
+"3.4.1": "F28C9C925C188C35E345614DEDA00CE834F0FC5C"
 }
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (af536459501 -> 70f34278cbf)

2023-06-29 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from af536459501 [SPARK-44237][CORE] Simplify DirectByteBuffer constructor 
lookup logic
 add 70f34278cbf [SPARK-44079][SQL] Fix `ArrayIndexOutOfBoundsException` 
when parse array as struct using PERMISSIVE mode with corrupt record

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/csv/UnivocityParser.scala |  4 ++--
 .../spark/sql/catalyst/json/JacksonParser.scala  | 20 +++-
 .../spark/sql/catalyst/util/BadRecordException.scala | 14 --
 .../spark/sql/catalyst/util/FailureSafeParser.scala  |  9 +++--
 .../sql/execution/datasources/json/JsonSuite.scala   | 15 +++
 5 files changed, 51 insertions(+), 11 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org