[spark] branch master updated: [SPARK-44030][SQL][FOLLOW-UP] Move unapply from AnyTimestampType to AnyTimestampTypeExpression
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 618b52097c0 [SPARK-44030][SQL][FOLLOW-UP] Move unapply from AnyTimestampType to AnyTimestampTypeExpression 618b52097c0 is described below commit 618b52097c07105d734aaf9b2a22b372920b3f31 Author: Rui Wang AuthorDate: Fri Jun 30 08:38:39 2023 +0300 [SPARK-44030][SQL][FOLLOW-UP] Move unapply from AnyTimestampType to AnyTimestampTypeExpression ### What changes were proposed in this pull request? Move unapply from AnyTimestampType to AnyTimestampTypeExpression. ### Why are the changes needed? To align with the effort that we use separate type expression class to host `unapply`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing Test Closes #41771 from amaliujia/atomic_datatype_expression. Authored-by: Rui Wang Signed-off-by: Max Gekk --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- .../spark/sql/catalyst/analysis/AnsiTypeCoercion.scala | 14 -- .../apache/spark/sql/catalyst/analysis/TypeCoercion.scala | 12 +++- .../org/apache/spark/sql/types/AbstractDataType.scala | 3 --- .../org/apache/spark/sql/types/DataTypeExpression.scala| 5 + 5 files changed, 22 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8a192a4c132..32cec909401 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -428,8 +428,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor UnaryMinus(r, mode == EvalMode.ANSI), ansiEnabled = mode == EvalMode.ANSI)) case (_, CalendarIntervalType | _: DayTimeIntervalType) => Cast(DatetimeSub(l, r, TimeAdd(l, UnaryMinus(r, mode == EvalMode.ANSI))), l.dataType) - case _ if AnyTimestampType.unapply(l) || AnyTimestampType.unapply(r) => -SubtractTimestamps(l, r) + case _ if AnyTimestampTypeExpression.unapply(l) || +AnyTimestampTypeExpression.unapply(r) => SubtractTimestamps(l, r) case (_, DateType) => SubtractDates(l, r) case (DateType, dt) if dt != StringType => DateSub(l, r) case _ => s diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala index d3f20f87493..5854f42a061 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala @@ -284,7 +284,7 @@ object AnsiTypeCoercion extends TypeCoercionBase { // Skip nodes who's children have not been resolved yet. case g if !g.childrenResolved => g - case g: GetDateField if AnyTimestampType.unapply(g.child) => + case g: GetDateField if AnyTimestampTypeExpression.unapply(g.child) => g.withNewChildren(Seq(Cast(g.child, DateType))) } } @@ -294,14 +294,16 @@ object AnsiTypeCoercion extends TypeCoercionBase { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e - case d @ DateAdd(AnyTimestampType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) - case d @ DateSub(AnyTimestampType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) + case d @ DateAdd(AnyTimestampTypeExpression(), _) => +d.copy(startDate = Cast(d.startDate, DateType)) + case d @ DateSub(AnyTimestampTypeExpression(), _) => +d.copy(startDate = Cast(d.startDate, DateType)) - case s @ SubtractTimestamps(DateTypeExpression(), AnyTimestampType(), _, _) => + case s @ SubtractTimestamps(DateTypeExpression(), AnyTimestampTypeExpression(), _, _) => s.copy(left = Cast(s.left, s.right.dataType)) - case s @ SubtractTimestamps(AnyTimestampType(), DateTypeExpression(), _, _) => + case s @ SubtractTimestamps(AnyTimestampTypeExpression(), DateTypeExpression(), _, _) => s.copy(right = Cast(s.right, s.left.dataType)) - case s @ SubtractTimestamps(AnyTimestampType(), AnyTimestampType(), _, _) + case s @ SubtractTimestamps(AnyTimestampTypeExpression(), AnyTimestampTypeExpression(), _, _) if s.left.dataType != s.right.dataType => val newLeft = castIfNotSameType(s.left, TimestampNTZType) val newRight = castIfNotSameType(s.right,
[spark] branch master updated: [SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new de8ec74f282 [SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python de8ec74f282 is described below commit de8ec74f2826db3815275b4fccef186f22c85833 Author: Tengfei Huang AuthorDate: Thu Jun 29 22:35:31 2023 -0700 [SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python ### What changes were proposed in this pull request? Add following functions: - array_agg - array_size - cardinality - count_min_sketch - named_struct - json_array_length - json_object_keys - mask To: - Scala API - Python API - Spark Connect Scala Client - Spark Connect Python Client ### Why are the changes needed? Add Scala, Python and Connect API for these sql functions: array_agg, array_size, cardinality, count_min_sketch, named_struct, json_array_length, json_object_keys, mask ### Does this PR introduce _any_ user-facing change? Yes, added new functions. ### How was this patch tested? New UT added. Closes #41718 from ivoson/SPARK-43926. Lead-authored-by: Tengfei Huang Co-authored-by: Tengfei Huang Signed-off-by: Ruifeng Zheng --- .../scala/org/apache/spark/sql/functions.scala | 159 + .../apache/spark/sql/PlanGenerationTestSuite.scala | 48 .../explain-results/function_array_agg.explain | 2 + .../explain-results/function_array_size.explain| 2 + .../explain-results/function_cardinality.explain | 2 + .../function_count_min_sketch.explain | 2 + .../function_json_array_length.explain | 2 + .../function_json_object_keys.explain | 2 + .../explain-results/function_mask.explain | 2 + .../function_mask_with_specific_upperChar.explain | 2 + ..._mask_with_specific_upperChar_lowerChar.explain | 2 + ..._specific_upperChar_lowerChar_digitChar.explain | 2 + ...upperChar_lowerChar_digitChar_otherChar.explain | 2 + .../explain-results/function_named_struct.explain | 2 + .../query-tests/queries/function_array_agg.json| 25 ++ .../queries/function_array_agg.proto.bin | Bin 0 -> 178 bytes .../query-tests/queries/function_array_size.json | 25 ++ .../queries/function_array_size.proto.bin | Bin 0 -> 179 bytes .../query-tests/queries/function_cardinality.json | 25 ++ .../queries/function_cardinality.proto.bin | Bin 0 -> 180 bytes .../queries/function_count_min_sketch.json | 37 +++ .../queries/function_count_min_sketch.proto.bin| Bin 0 -> 217 bytes .../queries/function_json_array_length.json| 25 ++ .../queries/function_json_array_length.proto.bin | Bin 0 -> 186 bytes .../queries/function_json_object_keys.json | 25 ++ .../queries/function_json_object_keys.proto.bin| Bin 0 -> 185 bytes .../query-tests/queries/function_mask.json | 25 ++ .../query-tests/queries/function_mask.proto.bin| Bin 0 -> 173 bytes .../function_mask_with_specific_upperChar.json | 29 +++ ...function_mask_with_specific_upperChar.proto.bin | Bin 0 -> 180 bytes ...ion_mask_with_specific_upperChar_lowerChar.json | 33 +++ ...ask_with_specific_upperChar_lowerChar.proto.bin | Bin 0 -> 187 bytes ...ith_specific_upperChar_lowerChar_digitChar.json | 37 +++ ...pecific_upperChar_lowerChar_digitChar.proto.bin | Bin 0 -> 194 bytes ...ic_upperChar_lowerChar_digitChar_otherChar.json | 41 ...perChar_lowerChar_digitChar_otherChar.proto.bin | Bin 0 -> 201 bytes .../query-tests/queries/function_named_struct.json | 37 +++ .../queries/function_named_struct.proto.bin| Bin 0 -> 203 bytes .../source/reference/pyspark.sql/functions.rst | 8 + python/pyspark/sql/connect/functions.py| 74 ++ python/pyspark/sql/functions.py| 254 + .../scala/org/apache/spark/sql/functions.scala | 180 +++ .../spark/sql/CountMinSketchAggQuerySuite.scala| 19 ++ .../apache/spark/sql/DataFrameAggregateSuite.scala | 12 + .../apache/spark/sql/DataFrameFunctionsSuite.scala | 85 ++- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 21 ++ 46 files changed, 1247 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index ed0c13b2145..2c1a966dc71 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++
[spark] branch master updated: [SPARK-44150][PYTHON][FOLLOW-UP] Revert commits
This is an automated email from the ASF dual-hosted git repository. xinrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e505244460b [SPARK-44150][PYTHON][FOLLOW-UP] Revert commits e505244460b is described below commit e505244460baa49f862d36333792c9d924cb4dde Author: Xinrong Meng AuthorDate: Thu Jun 29 14:55:03 2023 -0700 [SPARK-44150][PYTHON][FOLLOW-UP] Revert commits ### What changes were proposed in this pull request? Revert two commits of [SPARK-44150] that block master CI. ### Why are the changes needed? N/A ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested? N/A Closes #41799 from xinrong-meng/revert. Authored-by: Xinrong Meng Signed-off-by: Xinrong Meng --- python/pyspark/sql/pandas/serializers.py | 32 +++ python/pyspark/sql/tests/test_arrow_python_udf.py | 39 --- python/pyspark/worker.py | 3 -- 3 files changed, 5 insertions(+), 69 deletions(-) diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index 12d4c3077fe..307fcc33752 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -190,7 +190,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): ) return converter(s) -def _create_array(self, series, arrow_type, spark_type=None, arrow_cast=False): +def _create_array(self, series, arrow_type, spark_type=None): """ Create an Arrow Array from the given pandas.Series and optional type. @@ -202,9 +202,6 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): If None, pyarrow's inferred type will be used spark_type : DataType, optional If None, spark type converted from arrow_type will be used -arrow_cast: bool, optional -Whether to apply Arrow casting when the user-specified return type mismatches the -actual return values. Returns --- @@ -229,14 +226,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): else: mask = series.isnull() try: -if arrow_cast: -return pa.Array.from_pandas(series, mask=mask).cast( -target_type=arrow_type, safe=self._safecheck -) -else: -return pa.Array.from_pandas( -series, mask=mask, type=arrow_type, safe=self._safecheck -) +return pa.Array.from_pandas(series, mask=mask, type=arrow_type, safe=self._safecheck) except TypeError as e: error_msg = ( "Exception thrown when converting pandas.Series (%s) " @@ -329,14 +319,12 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): df_for_struct=False, struct_in_pandas="dict", ndarray_as_list=False, -arrow_cast=False, ): super(ArrowStreamPandasUDFSerializer, self).__init__(timezone, safecheck) self._assign_cols_by_name = assign_cols_by_name self._df_for_struct = df_for_struct self._struct_in_pandas = struct_in_pandas self._ndarray_as_list = ndarray_as_list -self._arrow_cast = arrow_cast def arrow_to_pandas(self, arrow_column): import pyarrow.types as types @@ -398,13 +386,7 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): # Assign result columns by schema name if user labeled with strings elif self._assign_cols_by_name and any(isinstance(name, str) for name in s.columns): arrs_names = [ -( -self._create_array( -s[field.name], field.type, arrow_cast=self._arrow_cast -), -field.name, -) -for field in t +(self._create_array(s[field.name], field.type), field.name) for field in t ] # Assign result columns by position else: @@ -412,11 +394,7 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): # the selected series has name '1', so we rename it to field.name # as the name is used by _create_array to provide a meaningful error message ( -self._create_array( -s[s.columns[i]].rename(field.name), -field.type, -arrow_cast=self._arrow_cast, -), +
[spark] branch master updated: [SPARK-44248][SS][SQL][KAFKA] Add preferred location in kafka source v2
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7142e04639c [SPARK-44248][SS][SQL][KAFKA] Add preferred location in kafka source v2 7142e04639c is described below commit 7142e04639c1481e41ad499e657b2c62120fe763 Author: Siying Dong AuthorDate: Fri Jun 30 06:50:24 2023 +0900 [SPARK-44248][SS][SQL][KAFKA] Add preferred location in kafka source v2 ### What changes were proposed in this pull request? In KafkaBatchInputPartition, which is used for Kafka v2 source, preferredLocations() is now returned from the location already pre-calculated. ### Why are the changes needed? DSv2 Kafka streaming source seems to miss setting the preferred location, which may destroy the purpose of cache for Kafka consumer (connection) & fetched data. For DSv1, we have set the preferred location in RDD. This information is not returned in DSv2. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Some manual verification. Closes #41790 from siying/kafkav2loc. Authored-by: Siying Dong Signed-off-by: Jungtaek Lim --- .../org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala index 508f5c7036b..97c8592d1da 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala @@ -34,7 +34,11 @@ private[kafka010] case class KafkaBatchInputPartition( executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, failOnDataLoss: Boolean, -includeHeaders: Boolean) extends InputPartition +includeHeaders: Boolean) extends InputPartition { + override def preferredLocations(): Array[String] = { +offsetRange.preferredLoc.map(Array(_)).getOrElse(Array()) + } +} private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory with Logging { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6e56cfeaca8 -> 414bc75ac5b)
This is an automated email from the ASF dual-hosted git repository. xinrong pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 6e56cfeaca8 [SPARK-44150][PYTHON][CONNECT] Explicit Arrow casting for mismatched return type in Arrow Python UDF add 414bc75ac5b [SPARK-44150][PYTHON][FOLLOW-UP] Fix ArrowStreamPandasSerializer to set arguments properly No new revisions were added by this update. Summary of changes: python/pyspark/sql/pandas/serializers.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44150][PYTHON][CONNECT] Explicit Arrow casting for mismatched return type in Arrow Python UDF
This is an automated email from the ASF dual-hosted git repository. xinrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6e56cfeaca8 [SPARK-44150][PYTHON][CONNECT] Explicit Arrow casting for mismatched return type in Arrow Python UDF 6e56cfeaca8 is described below commit 6e56cfeaca884b1ccfaa8524c70f12f118bc840c Author: Xinrong Meng AuthorDate: Thu Jun 29 11:46:06 2023 -0700 [SPARK-44150][PYTHON][CONNECT] Explicit Arrow casting for mismatched return type in Arrow Python UDF ### What changes were proposed in this pull request? Explicit Arrow casting for the mismatched return type of Arrow Python UDF. ### Why are the changes needed? A more standardized and coherent type coercion. Please refer to https://github.com/apache/spark/pull/41706 for a comprehensive comparison between type coercion rules of Arrow and Pickle(used by the default Python UDF) separately. See more at [[Design] Type-coercion in Arrow Python UDFs](https://docs.google.com/document/d/e/2PACX-1vTEGElOZfhl9NfgbBw4CTrlm-8F_xQCAKNOXouz-7mg5vYobS7lCGUsGkDZxPY0wV5YkgoZmkYlxccU/pub). ### Does this PR introduce _any_ user-facing change? Yes. FROM ```py >>> df = spark.createDataFrame(['1', '2'], schema='string') df.select(pandas_udf(lambda x: x, 'int')('value')).show() >>> df.select(pandas_udf(lambda x: x, 'int')('value')).show() ... org.apache.spark.api.python.PythonException: Traceback (most recent call last): ... pyarrow.lib.ArrowInvalid: Could not convert '1' with type str: tried to convert to int32 ``` TO ```py >>> df = spark.createDataFrame(['1', '2'], schema='string') >>> df.select(pandas_udf(lambda x: x, 'int')('value')).show() +---+ |(value)| +---+ | 1| | 2| +---+ ``` ### How was this patch tested? Unit tests. Closes #41503 from xinrong-meng/type_coersion. Authored-by: Xinrong Meng Signed-off-by: Xinrong Meng --- python/pyspark/sql/pandas/serializers.py | 30 ++--- python/pyspark/sql/tests/test_arrow_python_udf.py | 39 +++ python/pyspark/worker.py | 3 ++ 3 files changed, 67 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index 307fcc33752..a99eda9cbea 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -190,7 +190,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): ) return converter(s) -def _create_array(self, series, arrow_type, spark_type=None): +def _create_array(self, series, arrow_type, spark_type=None, arrow_cast=False): """ Create an Arrow Array from the given pandas.Series and optional type. @@ -202,6 +202,9 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): If None, pyarrow's inferred type will be used spark_type : DataType, optional If None, spark type converted from arrow_type will be used +arrow_cast: bool, optional +Whether to apply Arrow casting when the user-specified return type mismatches the +actual return values. Returns --- @@ -226,7 +229,12 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): else: mask = series.isnull() try: -return pa.Array.from_pandas(series, mask=mask, type=arrow_type, safe=self._safecheck) +if arrow_cast: +return pa.Array.from_pandas(series, mask=mask, type=arrow_type).cast( +target_type=arrow_type, safe=self._safecheck +) +else: +return pa.Array.from_pandas(series, mask=mask, safe=self._safecheck) except TypeError as e: error_msg = ( "Exception thrown when converting pandas.Series (%s) " @@ -319,12 +327,14 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): df_for_struct=False, struct_in_pandas="dict", ndarray_as_list=False, +arrow_cast=False, ): super(ArrowStreamPandasUDFSerializer, self).__init__(timezone, safecheck) self._assign_cols_by_name = assign_cols_by_name self._df_for_struct = df_for_struct self._struct_in_pandas = struct_in_pandas self._ndarray_as_list = ndarray_as_list +self._arrow_cast = arrow_cast def arrow_to_pandas(self, arrow_column): import pyarrow.types as types @@ -386,7 +396,13 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): # Assign result columns by schema name if user labeled with
[spark] branch master updated: [SPARK-43474][SS][CONNECT] Add a spark connect access to runtime Dataframes by ID
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c9734008401 [SPARK-43474][SS][CONNECT] Add a spark connect access to runtime Dataframes by ID c9734008401 is described below commit c9734008401ce7adfb154cda5496c808b2d76580 Author: Raghu Angadi AuthorDate: Thu Jun 29 09:25:15 2023 -0700 [SPARK-43474][SS][CONNECT] Add a spark connect access to runtime Dataframes by ID [This is a continuation of #41146, to change the author of the PR. Retains the description.] ### What changes were proposed in this pull request? This change adds a new spark connect relation type `CachedRemoteRelation`, which can represent a DataFrame that's been cached on the server side. On the server side, each `SessionHolder` has a cache to maintain mapping from Dataframe ID to actual dataframe. On the client side, a new relation type and function is added. The new function can create a DataFrame reference given a key. The key is the id of a cached DataFrame, which is usually passed from server to the client. When transforming the DataFrame reference, the server finds the actual DataFrame from the cache and replace it. One use case of this function will be streaming foreachBatch(). Server needs to call user function for every batch which takes a DataFrame as argument. With the new function, we can cache the DataFrame on the server. Pass the id back to client which can creates the DataFrame reference. ### Why are the changes needed? This change is needed to support streaming foreachBatch() in Spark Connect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Scala unit test. Manual test. (More end to end test will be added when foreachBatch() is supported. Currently there is no way to add a dataframe to the server cache using Python.) Closes #41580 from rangadi/df-ref. Authored-by: Raghu Angadi Signed-off-by: Hyukjin Kwon --- .../main/protobuf/spark/connect/relations.proto| 7 + .../sql/connect/planner/SparkConnectPlanner.scala | 8 + .../spark/sql/connect/service/SessionHolder.scala | 32 +++ .../service/SparkConnectSessionHodlerSuite.scala | 82 +++ python/pyspark/sql/connect/plan.py | 14 ++ python/pyspark/sql/connect/proto/relations_pb2.py | 270 +++-- python/pyspark/sql/connect/proto/relations_pb2.pyi | 28 +++ 7 files changed, 313 insertions(+), 128 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index d29ab02f86a..29405a1332b 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -70,6 +70,7 @@ message Relation { ApplyInPandasWithState apply_in_pandas_with_state = 34; HtmlString html_string = 35; CachedLocalRelation cached_local_relation = 36; +CachedRemoteRelation cached_remote_relation = 37; // NA functions NAFill fill_na = 90; @@ -398,6 +399,12 @@ message CachedLocalRelation { string hash = 3; } +// Represents a remote relation that has been cached on server. +message CachedRemoteRelation { + // (Required) ID of the remote related (assigned by the service). + string relation_id = 1; +} + // Relation of type [[Sample]] that samples a fraction of the dataset. message Sample { // (Required) Input relation for a Sample. diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index cecf14a7045..cdad4fc6190 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -162,6 +162,8 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { transformCoGroupMap(rel.getCoGroupMap) case proto.Relation.RelTypeCase.APPLY_IN_PANDAS_WITH_STATE => transformApplyInPandasWithState(rel.getApplyInPandasWithState) + case proto.Relation.RelTypeCase.CACHED_REMOTE_RELATION => +transformCachedRemoteRelation(rel.getCachedRemoteRelation) case proto.Relation.RelTypeCase.COLLECT_METRICS => transformCollectMetrics(rel.getCollectMetrics) case proto.Relation.RelTypeCase.PARSE => transformParse(rel.getParse) @@ -897,6 +899,12 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { .logicalPlan } +
[spark] branch master updated: [SPARK-44227][SQL] Extract SchemaUtils from StructField
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0ae86c0492c [SPARK-44227][SQL] Extract SchemaUtils from StructField 0ae86c0492c is described below commit 0ae86c0492cdeac6240ea8283f5bd93a8bf6e7a8 Author: Rui Wang AuthorDate: Thu Jun 29 09:03:35 2023 -0700 [SPARK-44227][SQL] Extract SchemaUtils from StructField ### What changes were proposed in this pull request? Extract SchemaUtils from StructField so that StructField does not requires anything from Catalyst. ### Why are the changes needed? To help StructField get rid of the need to depend on Catalyst. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #41772 from amaliujia/move_schema_utils. Authored-by: Rui Wang Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/util/SparkSchemaUtils.scala | 36 ++ .../org/apache/spark/sql/types/StructField.scala | 4 +-- .../org/apache/spark/sql/util/SchemaUtils.scala| 11 ++- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkSchemaUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkSchemaUtils.scala new file mode 100644 index 000..efb833f0a2b --- /dev/null +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkSchemaUtils.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +/** + * Utils for handling schemas. + */ +private[spark] object SparkSchemaUtils { + /** + * @param str The string to be escaped. + * @return The escaped string. + */ + def escapeMetaCharacters(str: String): String = { +str.replaceAll("\n", "n") + .replaceAll("\r", "r") + .replaceAll("\t", "t") + .replaceAll("\f", "f") + .replaceAll("\b", "b") + .replaceAll("\u000B", "v") + .replaceAll("\u0007", "a") + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala index e03b2e8ab3c..dd267ed763e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala @@ -24,7 +24,7 @@ import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeeded} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.catalyst.util.StringConcat -import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.util.SparkSchemaUtils /** * A field inside a StructType. @@ -51,7 +51,7 @@ case class StructField( stringConcat: StringConcat, maxDepth: Int): Unit = { if (maxDepth > 0) { - stringConcat.append(s"$prefix-- ${SchemaUtils.escapeMetaCharacters(name)}: " + + stringConcat.append(s"$prefix-- ${SparkSchemaUtils.escapeMetaCharacters(name)}: " + s"${dataType.typeName} (nullable = $nullable)\n") DataType.buildFormattedString(dataType, s"$prefix|", stringConcat, maxDepth) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index d202900381a..4b0e5308e65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpress import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, NamedTransform, Transform} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} +import
[spark] branch master updated: [SPARK-44208][CORE][SQL] Assign clear error class names for some logic that directly uses exceptions
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a9129defc0e [SPARK-44208][CORE][SQL] Assign clear error class names for some logic that directly uses exceptions a9129defc0e is described below commit a9129defc0ebbe68f20ec888352c30a90925d7ea Author: panbingkun AuthorDate: Thu Jun 29 17:31:03 2023 +0300 [SPARK-44208][CORE][SQL] Assign clear error class names for some logic that directly uses exceptions ### What changes were proposed in this pull request? The pr aims to assign clear error class names for some logic that directly uses exceptions, include: - ALL_PARTITION_COLUMNS_NOT_ALLOWED - INVALID_HIVE_COLUMN_NAME - SPECIFY_BUCKETING_IS_NOT_ALLOWED - SPECIFY_PARTITION_IS_NOT_ALLOWED - UNSUPPORTED_ADD_FILE.DIRECTORY - UNSUPPORTED_ADD_FILE.LOCAL_DIRECTORY ### Why are the changes needed? The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Update UT. - Pass GA. Closes #41740 from panbingkun/assign_new_name. Lead-authored-by: panbingkun Co-authored-by: panbingkun <84731...@qq.com> Signed-off-by: Max Gekk --- .../src/main/resources/error/error-classes.json| 42 +++--- .../main/scala/org/apache/spark/SparkContext.scala | 7 ++-- .../org/apache/spark/errors/SparkCoreErrors.scala | 14 .../spark/sql/errors/QueryCompilationErrors.scala | 2 +- .../spark/sql/execution/datasources/rules.scala| 16 + .../spark/sql/execution/command/DDLSuite.scala | 34 +- .../spark/sql/hive/HiveExternalCatalog.scala | 12 --- .../spark/sql/hive/execution/HiveDDLSuite.scala| 12 +++ 8 files changed, 97 insertions(+), 42 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 192a0747dfd..6db8c5e3bf1 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -4,6 +4,11 @@ "Non-deterministic expression should not appear in the arguments of an aggregate function." ] }, + "ALL_PARTITION_COLUMNS_NOT_ALLOWED" : { +"message" : [ + "Cannot use all columns for partition columns." +] + }, "ALTER_TABLE_COLUMN_DESCRIPTOR_DUPLICATE" : { "message" : [ "ALTER TABLE column specifies descriptor \"\" more than once, which is invalid." @@ -1180,6 +1185,11 @@ ], "sqlState" : "22023" }, + "INVALID_HIVE_COLUMN_NAME" : { +"message" : [ + "Cannot create the table having the nested column whose name contains invalid characters in Hive metastore." +] + }, "INVALID_IDENTIFIER" : { "message" : [ "The identifier is invalid. Please, consider quoting it with back-quotes as ``." @@ -2081,6 +2091,16 @@ "sortBy must be used together with bucketBy." ] }, + "SPECIFY_BUCKETING_IS_NOT_ALLOWED" : { +"message" : [ + "Cannot specify bucketing information if the table schema is not specified when creating and will be inferred at runtime." +] + }, + "SPECIFY_PARTITION_IS_NOT_ALLOWED" : { +"message" : [ + "It is not allowed to specify partition columns when the table schema is not defined. When the table schema is not provided, schema and partition columns will be inferred." +] + }, "SQL_CONF_NOT_FOUND" : { "message" : [ "The SQL config cannot be found. Please verify that the config exists." @@ -2303,6 +2323,23 @@ "Attempted to unset non-existent properties [] in table ." ] }, + "UNSUPPORTED_ADD_FILE" : { +"message" : [ + "Don't support add file." +], +"subClass" : { + "DIRECTORY" : { +"message" : [ + "The file is a directory, consider to set \"spark.sql.legacy.addSingleFileInAddFile\" to \"false\"." +] + }, + "LOCAL_DIRECTORY" : { +"message" : [ + "The local directory is not supported in a non-local master mode." +] + } +} + }, "UNSUPPORTED_ARROWTYPE" : { "message" : [ "Unsupported arrow type ." @@ -3588,11 +3625,6 @@ "Cannot use for partition column." ] }, - "_LEGACY_ERROR_TEMP_1154" : { -"message" : [ - "Cannot use all columns for partition columns." -] - }, "_LEGACY_ERROR_TEMP_1155" : { "message" : [ "Partition column `` not found in schema ." diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 24d788ff5bc..78c7ecb2782 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++
[spark] branch branch-3.4 updated: [SPARK-44079][SQL][3.4] Fix `ArrayIndexOutOfBoundsException` when parse array as struct using PERMISSIVE mode with corrupt record
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new ad29290a02f [SPARK-44079][SQL][3.4] Fix `ArrayIndexOutOfBoundsException` when parse array as struct using PERMISSIVE mode with corrupt record ad29290a02f is described below commit ad29290a02fb94a958fd21e301100338c9f5b82a Author: Jia Fan AuthorDate: Thu Jun 29 16:38:02 2023 +0300 [SPARK-44079][SQL][3.4] Fix `ArrayIndexOutOfBoundsException` when parse array as struct using PERMISSIVE mode with corrupt record ### What changes were proposed in this pull request? cherry pick #41662 , fix parse array as struct bug on branch 3.4 ### Why are the changes needed? Fix the bug when parse array as struct using PERMISSIVE mode with corrupt record ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. Closes #41784 from Hisoka-X/SPARK-44079_3.4_cherry_pick. Authored-by: Jia Fan Signed-off-by: Max Gekk --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 4 ++-- .../spark/sql/catalyst/json/JacksonParser.scala | 20 +++- .../spark/sql/catalyst/util/BadRecordException.scala | 14 -- .../spark/sql/catalyst/util/FailureSafeParser.scala | 9 +++-- .../sql/execution/datasources/json/JsonSuite.scala | 15 +++ 5 files changed, 51 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 42e03630b14..b58649da61c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -318,7 +318,7 @@ class UnivocityParser( if (tokens == null) { throw BadRecordException( () => getCurrentInput, -() => None, +() => Array.empty, QueryExecutionErrors.malformedCSVRecordError("")) } @@ -362,7 +362,7 @@ class UnivocityParser( } else { if (badRecordException.isDefined) { throw BadRecordException( - () => currentInput, () => requiredRow.headOption, badRecordException.get) + () => currentInput, () => Array(requiredRow.get), badRecordException.get) } else { requiredRow } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index bf07d65caa0..d9bff3dc7ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -135,7 +135,7 @@ class JacksonParser( // List([str_a_2,null], [null,str_b_3]) // case START_ARRAY if allowArrayAsStructs => -val array = convertArray(parser, elementConverter, isRoot = true) +val array = convertArray(parser, elementConverter, isRoot = true, arrayAsStructs = true) // Here, as we support reading top level JSON arrays and take every element // in such an array as a row, this case is possible. if (array.numElements() == 0) { @@ -517,7 +517,8 @@ class JacksonParser( private def convertArray( parser: JsonParser, fieldConverter: ValueConverter, - isRoot: Boolean = false): ArrayData = { + isRoot: Boolean = false, + arrayAsStructs: Boolean = false): ArrayData = { val values = ArrayBuffer.empty[Any] var badRecordException: Option[Throwable] = None @@ -537,6 +538,9 @@ class JacksonParser( if (badRecordException.isEmpty) { arrayData +} else if (arrayAsStructs) { + throw PartialResultArrayException(arrayData.toArray[InternalRow](schema), +badRecordException.get) } else { throw PartialResultException(InternalRow(arrayData), badRecordException.get) } @@ -570,7 +574,7 @@ class JacksonParser( // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. -throw BadRecordException(() => recordLiteral(record), () => None, e) +throw BadRecordException(() => recordLiteral(record), () => Array.empty, e) case e: CharConversionException if options.encoding.isEmpty => val msg = """JSON parser cannot handle a character in its input. @@ -578,11 +582,17 @@ class JacksonParser( |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg)
[spark] branch master updated: [MINOR][TESTS] Fix potential bug for AlterTableTest
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6511a3e9020 [MINOR][TESTS] Fix potential bug for AlterTableTest 6511a3e9020 is described below commit 6511a3e90206473985c2d6fd28d06eb7bcf8c98f Author: panbingkun AuthorDate: Thu Jun 29 12:28:03 2023 +0300 [MINOR][TESTS] Fix potential bug for AlterTableTest ### What changes were proposed in this pull request? The pr aims to fix potential bug for `AlterTableTest`. ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Manually test. - Pass GA. Closes #41783 from panbingkun/AlterTableTests_fix. Authored-by: panbingkun Signed-off-by: Max Gekk --- .../spark/sql/connector/AlterTableTests.scala | 373 + 1 file changed, 164 insertions(+), 209 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index 2047212a4ea..122b3ab07e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -42,7 +42,7 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase { if (catalogAndNamespace.isEmpty) { s"default.$tableName" } else { - s"${catalogAndNamespace}table_name" + s"$catalogAndNamespace$tableName" } } @@ -63,7 +63,7 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase { } test("AlterTable: change rejected by implementation") { -val t = s"${catalogAndNamespace}table_name" +val t = fullTableName("table_name") withTable(t) { sql(s"CREATE TABLE $t (id int) USING $v2Format") @@ -74,38 +74,35 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase { assert(exc.getMessage.contains("Unsupported table change")) assert(exc.getMessage.contains("Cannot drop all fields")) // from the implementation - val tableName = fullTableName(t) - val table = getTableMetadata(tableName) + val table = getTableMetadata(t) - assert(table.name === tableName) + assert(table.name === t) assert(table.schema === new StructType().add("id", IntegerType)) } } test("AlterTable: add top-level column") { -val t = s"${catalogAndNamespace}table_name" +val t = fullTableName("table_name") withTable(t) { sql(s"CREATE TABLE $t (id int) USING $v2Format") sql(s"ALTER TABLE $t ADD COLUMN data string") - val tableName = fullTableName(t) - val table = getTableMetadata(tableName) + val table = getTableMetadata(t) - assert(table.name === tableName) + assert(table.name === t) assert(table.schema === new StructType().add("id", IntegerType).add("data", StringType)) } } test("AlterTable: add column with NOT NULL") { -val t = s"${catalogAndNamespace}table_name" +val t = fullTableName("table_name") withTable(t) { sql(s"CREATE TABLE $t (id int) USING $v2Format") sql(s"ALTER TABLE $t ADD COLUMN data string NOT NULL") - val tableName = fullTableName(t) - val table = getTableMetadata(tableName) + val table = getTableMetadata(t) - assert(table.name === tableName) + assert(table.name === t) assert(table.schema === StructType(Seq( StructField("id", IntegerType), StructField("data", StringType, nullable = false @@ -113,15 +110,14 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase { } test("AlterTable: add column with comment") { -val t = s"${catalogAndNamespace}table_name" +val t = fullTableName("table_name") withTable(t) { sql(s"CREATE TABLE $t (id int) USING $v2Format") sql(s"ALTER TABLE $t ADD COLUMN data string COMMENT 'doc'") - val tableName = fullTableName(t) - val table = getTableMetadata(tableName) + val table = getTableMetadata(t) - assert(table.name === tableName) + assert(table.name === t) assert(table.schema === StructType(Seq( StructField("id", IntegerType), StructField("data", StringType).withComment("doc" @@ -129,7 +125,7 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase { } test("AlterTable: add column with interval type") { -val t = s"${catalogAndNamespace}table_name" +val t = fullTableName("table_name") withTable(t) { sql(s"CREATE TABLE $t (id int, point struct) USING $v2Format") val e1 = @@ -142,18 +138,17 @@ trait AlterTableTests extends SharedSparkSession with
[spark] branch master updated: [SPARK-44169][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2300-2304]
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ffbd1a3b5b1 [SPARK-44169][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2300-2304] ffbd1a3b5b1 is described below commit ffbd1a3b5b17386759a378dee5ef5cf6df7f2d09 Author: Jiaan Geng AuthorDate: Thu Jun 29 12:26:24 2023 +0300 [SPARK-44169][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2300-2304] ### What changes were proposed in this pull request? The pr aims to assign names to the error class _LEGACY_ERROR_TEMP_[2300-2304]. ### Why are the changes needed? Improve the error framework. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Exists test cases updated and added new test cases. Closes #41719 from beliefer/SPARK-44169. Authored-by: Jiaan Geng Signed-off-by: Max Gekk --- .../src/main/resources/error/error-classes.json| 74 --- .../catalyst/analysis/ResolveInlineTables.scala| 12 +- .../catalyst/analysis/higherOrderFunctions.scala | 14 +- .../analysis/ResolveLambdaVariablesSuite.scala | 18 +- .../spark/sql/execution/datasources/rules.scala| 4 +- .../sql-tests/analyzer-results/cte.sql.out | 4 +- .../analyzer-results/inline-table.sql.out | 12 +- .../analyzer-results/postgreSQL/boolean.sql.out| 2 +- .../postgreSQL/window_part3.sql.out| 2 +- .../postgreSQL/window_part4.sql.out| 2 +- .../analyzer-results/udf/udf-inline-table.sql.out | 12 +- .../test/resources/sql-tests/results/cte.sql.out | 4 +- .../sql-tests/results/inline-table.sql.out | 12 +- .../sql-tests/results/postgreSQL/boolean.sql.out | 2 +- .../results/postgreSQL/window_part3.sql.out| 2 +- .../results/postgreSQL/window_part4.sql.out| 2 +- .../sql-tests/results/udf/udf-inline-table.sql.out | 12 +- .../apache/spark/sql/ColumnExpressionSuite.scala | 33 +++- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 219 +++-- 19 files changed, 297 insertions(+), 145 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index e441686432a..192a0747dfd 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -704,11 +704,6 @@ ], "sqlState" : "42K04" }, - "FAILED_SQL_EXPRESSION_EVALUATION" : { -"message" : [ - "Failed to evaluate the SQL expression: . Please check your syntax and ensure all required tables and columns are available." -] - }, "FIELD_NOT_FOUND" : { "message" : [ "No such struct field in ." @@ -1197,6 +1192,28 @@ ], "sqlState" : "22003" }, + "INVALID_INLINE_TABLE" : { +"message" : [ + "Invalid inline table." +], +"subClass" : { + "CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE" : { +"message" : [ + "Cannot evaluate the expression in inline table definition." +] + }, + "FAILED_SQL_EXPRESSION_EVALUATION" : { +"message" : [ + "Failed to evaluate the SQL expression . Please check your syntax and ensure all required tables and columns are available." +] + }, + "INCOMPATIBLE_TYPES_IN_INLINE_TABLE" : { +"message" : [ + "Found incompatible types in the column for inline table." +] + } +} + }, "INVALID_JSON_ROOT_FIELD" : { "message" : [ "Cannot convert JSON root field to target Spark type." @@ -1209,6 +1226,23 @@ ], "sqlState" : "22032" }, + "INVALID_LAMBDA_FUNCTION_CALL" : { +"message" : [ + "Invalid lambda function call." +], +"subClass" : { + "DUPLICATE_ARG_NAMES" : { +"message" : [ + "The lambda function has duplicate arguments . Please, consider to rename the argument names or set to \"true\"." +] + }, + "NUM_ARGS_MISMATCH" : { +"message" : [ + "A higher order function expects arguments, but got ." +] + } +} + }, "INVALID_LATERAL_JOIN_TYPE" : { "message" : [ "The JOIN with LATERAL correlation is not allowed because an OUTER subquery cannot correlate to its join partner. Remove the LATERAL correlation or use an INNER JOIN, or LEFT OUTER JOIN instead." @@ -1654,6 +1688,11 @@ ], "sqlState" : "42803" }, + "MULTI_SOURCES_UNSUPPORTED_FOR_EXPRESSION" : { +"message" : [ + "The expression does not support more than one source." +] + }, "MULTI_UDF_INTERFACE_ERROR" : { "message" : [ "Not allowed to implement multiple UDF interfaces, UDF class ." @@ -5492,31 +5531,6 @@
[spark-docker] branch master updated: [SPARK-40513] Add --batch to gpg command
This is an automated email from the ASF dual-hosted git repository. yikun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark-docker.git The following commit(s) were added to refs/heads/master by this push: new 58d2885 [SPARK-40513] Add --batch to gpg command 58d2885 is described below commit 58d288546e8419d229f14b62b6a653999e0390f1 Author: Yikun Jiang AuthorDate: Thu Jun 29 16:05:47 2023 +0800 [SPARK-40513] Add --batch to gpg command ### What changes were proposed in this pull request? Add --batch to gpg command which essentially puts GnuPG into "API mode" instead of "UI mode". Apply changes to 3.4.x dockerfile. ### Why are the changes needed? Address DOI comments: https://github.com/docker-library/official-images/pull/13089#issuecomment-1611814491 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI passed Closes #51 from Yikun/batch. Authored-by: Yikun Jiang Signed-off-by: Yikun Jiang --- 3.4.0/scala2.12-java11-ubuntu/Dockerfile | 4 ++-- 3.4.1/scala2.12-java11-ubuntu/Dockerfile | 4 ++-- Dockerfile.template | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/3.4.0/scala2.12-java11-ubuntu/Dockerfile b/3.4.0/scala2.12-java11-ubuntu/Dockerfile index 854f86c..a4b081e 100644 --- a/3.4.0/scala2.12-java11-ubuntu/Dockerfile +++ b/3.4.0/scala2.12-java11-ubuntu/Dockerfile @@ -46,8 +46,8 @@ RUN set -ex; \ wget -nv -O spark.tgz "$SPARK_TGZ_URL"; \ wget -nv -O spark.tgz.asc "$SPARK_TGZ_ASC_URL"; \ export GNUPGHOME="$(mktemp -d)"; \ -gpg --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \ -gpg --keyserver hkps://keyserver.ubuntu.com --recv-keys "$GPG_KEY"; \ +gpg --batch --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \ +gpg --batch --keyserver hkps://keyserver.ubuntu.com --recv-keys "$GPG_KEY"; \ gpg --batch --verify spark.tgz.asc spark.tgz; \ gpgconf --kill all; \ rm -rf "$GNUPGHOME" spark.tgz.asc; \ diff --git a/3.4.1/scala2.12-java11-ubuntu/Dockerfile b/3.4.1/scala2.12-java11-ubuntu/Dockerfile index 6d62769..d8bba7e 100644 --- a/3.4.1/scala2.12-java11-ubuntu/Dockerfile +++ b/3.4.1/scala2.12-java11-ubuntu/Dockerfile @@ -46,8 +46,8 @@ RUN set -ex; \ wget -nv -O spark.tgz "$SPARK_TGZ_URL"; \ wget -nv -O spark.tgz.asc "$SPARK_TGZ_ASC_URL"; \ export GNUPGHOME="$(mktemp -d)"; \ -gpg --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \ -gpg --keyserver hkps://keyserver.ubuntu.com --recv-keys "$GPG_KEY"; \ +gpg --batch --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \ +gpg --batch --keyserver hkps://keyserver.ubuntu.com --recv-keys "$GPG_KEY"; \ gpg --batch --verify spark.tgz.asc spark.tgz; \ gpgconf --kill all; \ rm -rf "$GNUPGHOME" spark.tgz.asc; \ diff --git a/Dockerfile.template b/Dockerfile.template index 80b57e2..3d0aacf 100644 --- a/Dockerfile.template +++ b/Dockerfile.template @@ -46,8 +46,8 @@ RUN set -ex; \ wget -nv -O spark.tgz "$SPARK_TGZ_URL"; \ wget -nv -O spark.tgz.asc "$SPARK_TGZ_ASC_URL"; \ export GNUPGHOME="$(mktemp -d)"; \ -gpg --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \ -gpg --keyserver hkps://keyserver.ubuntu.com --recv-keys "$GPG_KEY"; \ +gpg --batch --keyserver hkps://keys.openpgp.org --recv-key "$GPG_KEY" || \ +gpg --batch --keyserver hkps://keyserver.ubuntu.com --recv-keys "$GPG_KEY"; \ gpg --batch --verify spark.tgz.asc spark.tgz; \ gpgconf --kill all; \ rm -rf "$GNUPGHOME" spark.tgz.asc; \ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-docker] branch master updated: [SPARK-44168][FOLLOWUP] Change v3.4 GPG_KEY to full key fingerprint
This is an automated email from the ASF dual-hosted git repository. yikun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark-docker.git The following commit(s) were added to refs/heads/master by this push: new 39264c5 [SPARK-44168][FOLLOWUP] Change v3.4 GPG_KEY to full key fingerprint 39264c5 is described below commit 39264c502cf21b71a1ab5da71760e5864abce099 Author: Yikun Jiang AuthorDate: Thu Jun 29 16:04:50 2023 +0800 [SPARK-44168][FOLLOWUP] Change v3.4 GPG_KEY to full key fingerprint ### What changes were proposed in this pull request? Change GPG key from `34F0FC5C` to `F28C9C925C188C35E345614DEDA00CE834F0FC5C` to avoid pontential collision. The full finger print can get from below cmd: ``` $ wget https://dist.apache.org/repos/dist/dev/spark/KEYS $ gpg --import KEYS $ gpg --fingerprint 34F0FC5C pub rsa4096 2015-05-05 [SC] F28C 9C92 5C18 8C35 E345 614D EDA0 0CE8 34F0 FC5C uid [ unknown] Dongjoon Hyun (CODE SIGNING KEY) sub rsa4096 2015-05-05 [E] ``` ### Why are the changes needed? - A short gpg key had been added as v3.4.0 gpg key in https://github.com/apache/spark-docker/pull/46 . - The short key `34F0FC5C` is from https://dist.apache.org/repos/dist/dev/spark/KEYS - According DOI review comments, https://github.com/docker-library/official-images/pull/13089#issuecomment-1609990551 , `this should be the full key fingerprint: F28C9C925C188C35E345614DEDA00CE834F0FC5C (generating a collision for such a short key ID is trivial.` - We'd better to switch the short key to full fingerprint ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI passed Closes #50 from Yikun/gpg_key. Authored-by: Yikun Jiang Signed-off-by: Yikun Jiang --- 3.4.1/scala2.12-java11-ubuntu/Dockerfile | 2 +- tools/template.py| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/3.4.1/scala2.12-java11-ubuntu/Dockerfile b/3.4.1/scala2.12-java11-ubuntu/Dockerfile index bf106a6..6d62769 100644 --- a/3.4.1/scala2.12-java11-ubuntu/Dockerfile +++ b/3.4.1/scala2.12-java11-ubuntu/Dockerfile @@ -38,7 +38,7 @@ RUN set -ex; \ # https://downloads.apache.org/spark/KEYS ENV SPARK_TGZ_URL=https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz \ SPARK_TGZ_ASC_URL=https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz.asc \ -GPG_KEY=34F0FC5C +GPG_KEY=F28C9C925C188C35E345614DEDA00CE834F0FC5C RUN set -ex; \ export SPARK_TMP="$(mktemp -d)"; \ diff --git a/tools/template.py b/tools/template.py index 93e842a..cdc167c 100755 --- a/tools/template.py +++ b/tools/template.py @@ -31,7 +31,7 @@ GPG_KEY_DICT = { # issuer "xinr...@apache.org" "3.4.0": "CC68B3D16FE33A766705160BA7E57908C7A4E1B1", # issuer "dongj...@apache.org" -"3.4.1": "34F0FC5C" +"3.4.1": "F28C9C925C188C35E345614DEDA00CE834F0FC5C" } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (af536459501 -> 70f34278cbf)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from af536459501 [SPARK-44237][CORE] Simplify DirectByteBuffer constructor lookup logic add 70f34278cbf [SPARK-44079][SQL] Fix `ArrayIndexOutOfBoundsException` when parse array as struct using PERMISSIVE mode with corrupt record No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/csv/UnivocityParser.scala | 4 ++-- .../spark/sql/catalyst/json/JacksonParser.scala | 20 +++- .../spark/sql/catalyst/util/BadRecordException.scala | 14 -- .../spark/sql/catalyst/util/FailureSafeParser.scala | 9 +++-- .../sql/execution/datasources/json/JsonSuite.scala | 15 +++ 5 files changed, 51 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org