[spark] branch master updated (4d33ee07227 -> 58490da6d2e)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 4d33ee07227 [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates add 58490da6d2e [SPARK-40800][SQL] Always inline expressions in OptimizeOneRowRelationSubquery No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/optimizer/Optimizer.scala | 5 ++- .../spark/sql/catalyst/optimizer/subquery.scala| 4 ++- .../scala/org/apache/spark/sql/SubquerySuite.scala | 39 +++--- 3 files changed, 42 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (74c82642941 -> 4d33ee07227)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 74c82642941 [SPARK-40812][CONNECT][PYTHON][FOLLOW-UP] Improve Deduplicate in Python client add 4d33ee07227 [SPARK-36114][SQL] Support subqueries with correlated non-equality predicates No new revisions were added by this update. Summary of changes: .../sql/catalyst/analysis/CheckAnalysis.scala | 7 +- .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 2 +- .../resources/sql-tests/inputs/join-lateral.sql| 3 + .../scalar-subquery/scalar-subquery-select.sql | 45 + .../sql-tests/results/join-lateral.sql.out | 9 ++ .../scalar-subquery/scalar-subquery-select.sql.out | 107 + .../sql-tests/results/udf/udf-except.sql.out | 17 +--- .../scala/org/apache/spark/sql/SubquerySuite.scala | 59 +--- 8 files changed, 195 insertions(+), 54 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40812][CONNECT][PYTHON][FOLLOW-UP] Improve Deduplicate in Python client
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 74c82642941 [SPARK-40812][CONNECT][PYTHON][FOLLOW-UP] Improve Deduplicate in Python client 74c82642941 is described below commit 74c826429416493a6d1d0efdf83b0e561dc33591 Author: Rui Wang AuthorDate: Mon Oct 24 10:50:55 2022 +0800 [SPARK-40812][CONNECT][PYTHON][FOLLOW-UP] Improve Deduplicate in Python client ### What changes were proposed in this pull request? Following up on https://github.com/apache/spark/pull/38276, this PR improve both `distinct()` and `dropDuplicates` DataFrame API in Python client, which both depends on `Deduplicate` plan in the Connect proto. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38327 from amaliujia/python_deduplicate. Authored-by: Rui Wang Signed-off-by: Wenchen Fan --- python/pyspark/sql/connect/dataframe.py| 41 +++--- python/pyspark/sql/connect/plan.py | 39 .../sql/tests/connect/test_connect_plan_only.py| 19 ++ 3 files changed, 95 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index eabcf433ae9..2b7e3d52039 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -157,11 +157,44 @@ class DataFrame(object): def describe(self, cols: List[ColumnRef]) -> Any: ... +def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame": +"""Return a new :class:`DataFrame` with duplicate rows removed, +optionally only deduplicating based on certain columns. + +.. versionadded:: 3.4.0 + +Parameters +-- +subset : List of column names, optional +List of columns to use for duplicate comparison (default All columns). + +Returns +--- +:class:`DataFrame` +DataFrame without duplicated rows. +""" +if subset is None: +return DataFrame.withPlan( +plan.Deduplicate(child=self._plan, all_columns_as_keys=True), session=self._session +) +else: +return DataFrame.withPlan( +plan.Deduplicate(child=self._plan, column_names=subset), session=self._session +) + def distinct(self) -> "DataFrame": -"""Returns all distinct rows.""" -all_cols = self.columns -gf = self.groupBy(*all_cols) -return gf.agg() +"""Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`. + +.. versionadded:: 3.4.0 + +Returns +--- +:class:`DataFrame` +DataFrame with distinct rows. +""" +return DataFrame.withPlan( +plan.Deduplicate(child=self._plan, all_columns_as_keys=True), session=self._session +) def drop(self, *cols: "ColumnOrString") -> "DataFrame": all_cols = self.columns diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 297b15994d3..d6b6f9e3b67 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -327,6 +327,45 @@ class Offset(LogicalPlan): """ +class Deduplicate(LogicalPlan): +def __init__( +self, +child: Optional["LogicalPlan"], +all_columns_as_keys: bool = False, +column_names: Optional[List[str]] = None, +) -> None: +super().__init__(child) +self.all_columns_as_keys = all_columns_as_keys +self.column_names = column_names + +def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: +assert self._child is not None +plan = proto.Relation() +plan.deduplicate.all_columns_as_keys = self.all_columns_as_keys +if self.column_names is not None: +plan.deduplicate.column_names.extend(self.column_names) +return plan + +def print(self, indent: int = 0) -> str: +c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child else "" +return ( +f"{' ' * indent}\n{c_buf}" +) + +def _repr_html_(self) -> str: +return f""" + + +Deduplicate +all_columns_as_keys: {self.all_columns_as_keys} +column_names: {self.column_names} +{self._child_repr_()} + + +""" + + class Sort(LogicalPlan): def __init__( self, child: Optional["LogicalPlan"], *columns:
[spark] tag v3.3.1 created (now fbbcf9434ac)
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a change to tag v3.3.1 in repository https://gitbox.apache.org/repos/asf/spark.git at fbbcf9434ac (commit) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (79aae64380f -> f7eee095049)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 79aae64380f [SPARK-40849][SS] Async log purge add f7eee095049 [SPARK-40880][SQL][FOLLOW-UP] Remove unused imports No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/execution/stat/StatFunctions.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40849][SS] Async log purge
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 79aae64380f [SPARK-40849][SS] Async log purge 79aae64380f is described below commit 79aae64380ff83570549cb8c4ed85ffb022fc8eb Author: Jerry Peng AuthorDate: Mon Oct 24 11:09:40 2022 +0900 [SPARK-40849][SS] Async log purge ### What changes were proposed in this pull request? Purging old entries in both the offset log and commit log will be done asynchronously. For every micro-batch, older entries in both offset log and commit log are deleted. This is done so that the offset log and commit log do not continually grow. Please reference logic here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L539 The time spent performing these log purges is grouped with the “walCommit” execution time in the StreamingProgressListener metrics. Around two thirds of the “walCommit” execution time is performing these purge operations thus making these operations asynchronous will also reduce latency. Also, we do not necessarily need to perform the purges every micro-batch. When these purges are executed asynchronously, they do not need to block micro-batch execution and we don’t need to start a [...] ### Why are the changes needed? Decrease microbatch processing latency ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes #38313 from jerrypeng/SPARK-40849. Authored-by: Jerry Peng Signed-off-by: Jungtaek Lim --- .../scala/org/apache/spark/util/ThreadUtils.scala | 4 +- .../org/apache/spark/sql/internal/SQLConf.scala| 9 +++ .../sql/execution/streaming/AsyncLogPurge.scala| 82 + .../sql/execution/streaming/ErrorNotifier.scala| 46 .../execution/streaming/MicroBatchExecution.scala | 22 +- .../sql/execution/streaming/StreamExecution.scala | 7 ++ .../streaming/MicroBatchExecutionSuite.scala | 85 +- 7 files changed, 249 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index d45dc937910..99b4e894bf0 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -162,9 +162,9 @@ private[spark] object ThreadUtils { /** * Wrapper over newSingleThreadExecutor. */ - def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = { + def newDaemonSingleThreadExecutor(threadName: String): ThreadPoolExecutor = { val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build() -Executors.newSingleThreadExecutor(threadFactory) +Executors.newFixedThreadPool(1, threadFactory).asInstanceOf[ThreadPoolExecutor] } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 72eb420de37..ebff9ce546d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1982,6 +1982,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ASYNC_LOG_PURGE = +buildConf("spark.sql.streaming.asyncLogPurge.enabled") + .internal() + .doc("When true, purging the offset log and " + +"commit log of old entries will be done asynchronously.") + .version("3.4.0") + .booleanConf + .createWithDefault(true) + val VARIABLE_SUBSTITUTE_ENABLED = buildConf("spark.sql.variable.substitute") .doc("This enables substitution using syntax like `${var}`, `${system:var}`, " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala new file mode 100644 index 000..b3729dbc7b4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless
[spark] branch master updated: [SPARK-40880][SQL] Reimplement `summary` with dataframe operations
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6a0713a141f [SPARK-40880][SQL] Reimplement `summary` with dataframe operations 6a0713a141f is described below commit 6a0713a141fa98d83029d8388508cbbc40fd554e Author: Ruifeng Zheng AuthorDate: Mon Oct 24 10:58:13 2022 +0900 [SPARK-40880][SQL] Reimplement `summary` with dataframe operations ### What changes were proposed in this pull request? Reimplement `summary` with dataframe operations ### Why are the changes needed? 1, do not truncate the sql plan any more; 2, enable sql optimization like column pruning: ``` scala> val df = spark.range(0, 3, 1, 10).withColumn("value", lit("str")) df: org.apache.spark.sql.DataFrame = [id: bigint, value: string] scala> df.summary("max", "50%").show +---+---+-+ |summary| id|value| +---+---+-+ |max| 2| str| |50%| 1| null| +---+---+-+ scala> df.summary("max", "50%").select("id").show +---+ | id| +---+ | 2| | 1| +---+ scala> df.summary("max", "50%").select("id").queryExecution.optimizedPlan res4: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [element_at(id#367, summary#376, None, false) AS id#371] +- Generate explode([max,50%]), false, [summary#376] +- Aggregate [map(max, cast(max(id#153L) as string), 50%, cast(percentile_approx(id#153L, [0.5], 1, 0, 0)[0] as string)) AS id#367] +- Range (0, 3, step=1, splits=Some(10)) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing UTs and manually check Closes #38346 from zhengruifeng/sql_stat_summary. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .../spark/sql/execution/stat/StatFunctions.scala | 122 ++--- 1 file changed, 59 insertions(+), 63 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 484be76b991..508d2c64d09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -21,11 +21,10 @@ import java.util.Locale import org.apache.spark.internal.Logging import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, EvalMode, Expression, GenericInternalRow, GetArrayItem, Literal} +import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, EvalMode, GenericInternalRow} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.catalyst.util.{GenericArrayData, QuantileSummaries} +import org.apache.spark.sql.catalyst.util.QuantileSummaries import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -199,9 +198,11 @@ object StatFunctions extends Logging { /** Calculate selected summary statistics for a dataset */ def summary(ds: Dataset[_], statistics: Seq[String]): DataFrame = { - -val defaultStatistics = Seq("count", "mean", "stddev", "min", "25%", "50%", "75%", "max") -val selectedStatistics = if (statistics.nonEmpty) statistics else defaultStatistics +val selectedStatistics = if (statistics.nonEmpty) { + statistics.toArray +} else { + Array("count", "mean", "stddev", "min", "25%", "50%", "75%", "max") +} val percentiles = selectedStatistics.filter(a => a.endsWith("%")).map { p => try { @@ -213,71 +214,66 @@ object StatFunctions extends Logging { } require(percentiles.forall(p => p >= 0 && p <= 1), "Percentiles must be in the range [0, 1]") -def castAsDoubleIfNecessary(e: Expression): Expression = if (e.dataType == StringType) { - Cast(e, DoubleType, evalMode = EvalMode.TRY) -} else { - e -} -var percentileIndex = 0 -val statisticFns = selectedStatistics.map { stats => - if (stats.endsWith("%")) { -val index = percentileIndex -percentileIndex += 1 -(child: Expression) => - GetArrayItem( -new ApproximatePercentile(castAsDoubleIfNecessary(child), - Literal(new GenericArrayData(percentiles), ArrayType(DoubleType, false))) - .toAggregateExpression(), -Literal(index)) - } else { -stats.toLowerCase(Locale.ROOT) match { - case "count" => (child: Expression)
[spark] branch master updated (02a2242a450 -> 5d3b1e6ed54)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 02a2242a450 [SPARK-40884][BUILD] Upgrade fabric8io - `kubernetes-client` to 6.2.0 add 5d3b1e6ed54 [SPARK-40877][SQL] Reimplement `crosstab` with dataframe operations No new revisions were added by this update. Summary of changes: .../spark/sql/execution/stat/StatFunctions.scala | 50 -- 1 file changed, 8 insertions(+), 42 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40884][BUILD] Upgrade fabric8io - `kubernetes-client` to 6.2.0
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 02a2242a450 [SPARK-40884][BUILD] Upgrade fabric8io - `kubernetes-client` to 6.2.0 02a2242a450 is described below commit 02a2242a45062755bf7e20805958d5bdf1f5ed74 Author: Bjørn AuthorDate: Mon Oct 24 10:32:18 2022 +0900 [SPARK-40884][BUILD] Upgrade fabric8io - `kubernetes-client` to 6.2.0 ### What changes were proposed in this pull request? Upgrade fabric8io - kubernetes-client from 6.1.1 to 6.2.0 ### Why are the changes needed? [Release notes](https://github.com/fabric8io/kubernetes-client/releases/tag/v6.2.0) [Snakeyaml version should be updated to mitigate CVE-2022-28857](https://github.com/fabric8io/kubernetes-client/issues/4383) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Closes #38348 from bjornjorgensen/kubernetes-client6.2.0. Authored-by: Bjørn Signed-off-by: Hyukjin Kwon --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 48 +-- dev/deps/spark-deps-hadoop-3-hive-2.3 | 48 +-- pom.xml | 2 +- 3 files changed, 49 insertions(+), 49 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 6756dd58312..2c1eab56f33 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -160,30 +160,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar jta/1.1//jta-1.1.jar jul-to-slf4j/2.0.3//jul-to-slf4j-2.0.3.jar kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar -kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar -kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar -kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar -kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar -kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar -kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar -kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar -kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar -kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar -kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar -kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar -kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar -kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar -kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar -kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar -kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar -kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar -kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar -kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar -kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar -kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar -kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar -kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar -kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar +kubernetes-client-api/6.2.0//kubernetes-client-api-6.2.0.jar +kubernetes-client/6.2.0//kubernetes-client-6.2.0.jar +kubernetes-httpclient-okhttp/6.2.0//kubernetes-httpclient-okhttp-6.2.0.jar +kubernetes-model-admissionregistration/6.2.0//kubernetes-model-admissionregistration-6.2.0.jar +kubernetes-model-apiextensions/6.2.0//kubernetes-model-apiextensions-6.2.0.jar +kubernetes-model-apps/6.2.0//kubernetes-model-apps-6.2.0.jar +kubernetes-model-autoscaling/6.2.0//kubernetes-model-autoscaling-6.2.0.jar +kubernetes-model-batch/6.2.0//kubernetes-model-batch-6.2.0.jar +kubernetes-model-certificates/6.2.0//kubernetes-model-certificates-6.2.0.jar +kubernetes-model-common/6.2.0//kubernetes-model-common-6.2.0.jar +kubernetes-model-coordination/6.2.0//kubernetes-model-coordination-6.2.0.jar +kubernetes-model-core/6.2.0//kubernetes-model-core-6.2.0.jar +kubernetes-model-discovery/6.2.0//kubernetes-model-discovery-6.2.0.jar +kubernetes-model-events/6.2.0//kubernetes-model-events-6.2.0.jar +kubernetes-model-extensions/6.2.0//kubernetes-model-extensions-6.2.0.jar +kubernetes-model-flowcontrol/6.2.0//kubernetes-model-flowcontrol-6.2.0.jar +kubernetes-model-gatewayapi/6.2.0//kubernetes-model-gatewayapi-6.2.0.jar +kubernetes-model-metrics/6.2.0//kubernetes-model-metrics-6.2.0.jar +kubernetes-model-networking/6.2.0//kubernetes-model-networking-6.2.0.jar +kubernetes-model-node/6.2.0//kubernetes-model-node-6.2.0.jar
[spark] branch branch-3.1 updated: [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 7c3887c1ed2 [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled 7c3887c1ed2 is described below commit 7c3887c1ed2e23bd0010d3e79a847bad18818461 Author: Peter Toth AuthorDate: Sat Oct 22 10:39:32 2022 +0900 [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script: ``` bin/pyspark --conf spark.io.encryption.enabled=true ... bar = {"a": "aa", "b": "bb"} foo = spark.sparkContext.broadcast(bar) spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "") spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect() ``` fails with: ``` 22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1] org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command command = serializer._read_with_length(file) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length return self.loads(obj) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads return cloudpickle.loads(obj, encoding=encoding) EOFError: Ran out of input ``` The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code: https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420 the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side. Please note that the example above works in Spark 3.3 without this fix. That is because https://github.com/apache/spark/pull/36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`: https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242 changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that https://github.com/apache/spark/pull/36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBr [...] To fix a bug. No. Added new UT. Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf. Authored-by: Peter Toth Signed-off-by: Hyukjin Kwon (cherry picked from commit 8a96f69bb536729eaa59fae55160f8a6747efbe3) Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/api/python/PythonRunner.scala | 2 +- python/pyspark/tests/test_broadcast.py | 14 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 8d9f2be6218..60689858628 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -360,6 +360,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( // the decrypted data to python val idsAndFiles = broadcastVars.flatMap { broadcast => if (!oldBids.contains(broadcast.id)) { + oldBids.add(broadcast.id) Some((broadcast.id, broadcast.value.path)) } else { None @@ -373,7 +374,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( idsAndFiles.foreach { case (id, _) => // send new broadcast dataOut.writeLong(id) -oldBids.add(id) } dataOut.flush() logTrace("waiting for python to read decrypted broadcast data from server") diff --git a/python/pyspark/tests/test_broadcast.py b/python/pyspark/tests/test_broadcast.py index
[spark] branch branch-3.2 updated: [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new b6b49457c0b [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled b6b49457c0b is described below commit b6b49457c0b6f89efb8c458f73228b3b634f7940 Author: Peter Toth AuthorDate: Sat Oct 22 10:39:32 2022 +0900 [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script: ``` bin/pyspark --conf spark.io.encryption.enabled=true ... bar = {"a": "aa", "b": "bb"} foo = spark.sparkContext.broadcast(bar) spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "") spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect() ``` fails with: ``` 22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1] org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command command = serializer._read_with_length(file) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length return self.loads(obj) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads return cloudpickle.loads(obj, encoding=encoding) EOFError: Ran out of input ``` The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code: https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420 the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side. Please note that the example above works in Spark 3.3 without this fix. That is because https://github.com/apache/spark/pull/36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`: https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242 changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that https://github.com/apache/spark/pull/36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBr [...] To fix a bug. No. Added new UT. Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf. Authored-by: Peter Toth Signed-off-by: Hyukjin Kwon (cherry picked from commit 8a96f69bb536729eaa59fae55160f8a6747efbe3) Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/api/python/PythonRunner.scala | 2 +- python/pyspark/tests/test_broadcast.py | 14 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index fabff970f2b..3a3e7e04e7f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -398,6 +398,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( // the decrypted data to python val idsAndFiles = broadcastVars.flatMap { broadcast => if (!oldBids.contains(broadcast.id)) { + oldBids.add(broadcast.id) Some((broadcast.id, broadcast.value.path)) } else { None @@ -411,7 +412,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( idsAndFiles.foreach { case (id, _) => // send new broadcast dataOut.writeLong(id) -oldBids.add(id) } dataOut.flush() logTrace("waiting for python to read decrypted broadcast data from server") diff --git a/python/pyspark/tests/test_broadcast.py b/python/pyspark/tests/test_broadcast.py index
[spark] branch branch-3.3 updated: [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 476ce566c41 [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled 476ce566c41 is described below commit 476ce566c412437c0dde6b4006d3685548370784 Author: Peter Toth AuthorDate: Sat Oct 22 10:39:32 2022 +0900 [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script: ``` bin/pyspark --conf spark.io.encryption.enabled=true ... bar = {"a": "aa", "b": "bb"} foo = spark.sparkContext.broadcast(bar) spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "") spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect() ``` fails with: ``` 22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1] org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command command = serializer._read_with_length(file) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length return self.loads(obj) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads return cloudpickle.loads(obj, encoding=encoding) EOFError: Ran out of input ``` The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code: https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420 the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side. Please note that the example above works in Spark 3.3 without this fix. That is because https://github.com/apache/spark/pull/36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`: https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242 changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that https://github.com/apache/spark/pull/36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBr [...] To fix a bug. No. Added new UT. Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf. Authored-by: Peter Toth Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/api/python/PythonRunner.scala | 2 +- python/pyspark/tests/test_broadcast.py | 14 ++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 15707ab9157..f32c80f3ef5 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -401,6 +401,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( // the decrypted data to python val idsAndFiles = broadcastVars.flatMap { broadcast => if (!oldBids.contains(broadcast.id)) { + oldBids.add(broadcast.id) Some((broadcast.id, broadcast.value.path)) } else { None @@ -414,7 +415,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( idsAndFiles.foreach { case (id, _) => // send new broadcast dataOut.writeLong(id) -oldBids.add(id) } dataOut.flush() logTrace("waiting for python to read decrypted broadcast data from server") diff --git a/python/pyspark/tests/test_broadcast.py b/python/pyspark/tests/test_broadcast.py index 56763e8d80a..6dce34c4ca5 100644 --- a/python/pyspark/tests/test_broadcast.py +++
[spark] branch master updated: [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 96b5d50f3ef [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column 96b5d50f3ef is described below commit 96b5d50f3efb97c734f8c370e263a82d34f78d1b Author: Alex Balikov <91913242+alex-bali...@users.noreply.github.com> AuthorDate: Mon Oct 24 08:12:42 2022 +0900 [SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function to extract event time from the window column ### What changes were proposed in this pull request? This PR introduces a window_time function to extract streaming event time from a window column produced by the window aggregating operators. This is one step in sequence of fixes required to add support for multiple stateful operators in Spark Structured Streaming as described in https://issues.apache.org/jira/browse/SPARK-40821 ### Why are the changes needed? The window_time function is a convenience function to compute correct event time for a window aggregate records. Such records produced by window aggregating operators have no explicit event time but rather a window column of type StructType { start: TimestampType, end: TimestampType } where start is inclusive and end is exclusive. The correct event time for such record is window.end - 1. The event time is necessary when chaining other stateful operators after the window aggregating op [...] ### Does this PR introduce _any_ user-facing change? Yes: The PR introduces a new window_time SQL function for both Scala and Python APIs. ### How was this patch tested? Added new unit tests. Closes #38288 from alex-balikov/SPARK-40821-time-window. Authored-by: Alex Balikov <91913242+alex-bali...@users.noreply.github.com> Signed-off-by: Jungtaek Lim --- .../source/reference/pyspark.sql/functions.rst | 1 + python/pyspark/sql/functions.py| 46 +++ python/pyspark/sql/tests/test_functions.py | 16 + .../spark/sql/catalyst/analysis/Analyzer.scala | 238 +- .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../sql/catalyst/analysis/ResolveTimeWindows.scala | 346 + .../sql/catalyst/expressions/TimeWindow.scala | 2 + .../sql/catalyst/expressions/WindowTime.scala | 62 .../scala/org/apache/spark/sql/functions.scala | 17 + .../sql-functions/sql-expression-schema.md | 1 + .../spark/sql/DataFrameTimeWindowingSuite.scala| 62 11 files changed, 555 insertions(+), 237 deletions(-) diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index 5a64845598e..37ddbaf1673 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -142,6 +142,7 @@ Datetime Functions window session_window timestamp_seconds +window_time Collection Functions diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index f01379afd6e..ad1bc488e87 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -4884,6 +4884,52 @@ def window( return _invoke_function("window", time_col, windowDuration) +def window_time( +windowColumn: "ColumnOrName", +) -> Column: +"""Computes the event time from a window column. The column window values are produced +by window aggregating operators and are of type `STRUCT` +where start is inclusive and end is exclusive. The event time of records produced by window +aggregating operators can be computed as ``window_time(window)`` and are +``window.end - lit(1).alias("microsecond")`` (as microsecond is the minimal supported event +time precision). The window column must be one produced by a window aggregating operator. + +.. versionadded:: 3.4.0 + +Parameters +-- +windowColumn : :class:`~pyspark.sql.Column` +The window column of a window aggregate records. + +Returns +--- +:class:`~pyspark.sql.Column` +the column for computed results. + +Examples + +>>> import datetime +>>> df = spark.createDataFrame( +... [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)], +... ).toDF("date", "val") + +Group the data into 5 second time windows and aggregate as sum. + +>>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum")) + +Extract the window event time using the window_time function. + +>>> w.select( +... w.window.end.cast("string").alias("end"), +... window_time(w.window).cast("string").alias("window_time"), +... "sum" +...
[spark] branch branch-3.3 updated: [SPARK-40886][BUILD] Bump Jackson Databind 2.13.4.2
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new e674356725d [SPARK-40886][BUILD] Bump Jackson Databind 2.13.4.2 e674356725d is described below commit e674356725de1063760926e66c93dab4813a7aa8 Author: Cheng Pan AuthorDate: Sun Oct 23 11:37:42 2022 -0500 [SPARK-40886][BUILD] Bump Jackson Databind 2.13.4.2 ### What changes were proposed in this pull request? Bump Jackson Databind from 2.13.4.1 to 2.13.4.2 ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? There is a regression about Gradle in 2.13.4.1 and got fixed in 2.13.4.2 https://github.com/FasterXML/jackson-databind/issues/3627 ### How was this patch tested? Existing UT. Closes #38355 from pan3793/SPARK-40886. Authored-by: Cheng Pan Signed-off-by: Sean Owen (cherry picked from commit e73b157f5c4d20c49ec0e3a7bd82a72d3271f766) Signed-off-by: Sean Owen --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +- pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index c7a7b3cbce9..d517d556feb 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -115,7 +115,7 @@ ivy/2.5.0//ivy-2.5.0.jar jackson-annotations/2.13.4//jackson-annotations-2.13.4.jar jackson-core-asl/1.9.13//jackson-core-asl-1.9.13.jar jackson-core/2.13.4//jackson-core-2.13.4.jar -jackson-databind/2.13.4.1//jackson-databind-2.13.4.1.jar +jackson-databind/2.13.4.2//jackson-databind-2.13.4.2.jar jackson-dataformat-cbor/2.13.4//jackson-dataformat-cbor-2.13.4.jar jackson-dataformat-yaml/2.13.4//jackson-dataformat-yaml-2.13.4.jar jackson-datatype-jsr310/2.13.4//jackson-datatype-jsr310-2.13.4.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 259efd760e2..54e7fe23e5b 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -105,7 +105,7 @@ ivy/2.5.0//ivy-2.5.0.jar jackson-annotations/2.13.4//jackson-annotations-2.13.4.jar jackson-core-asl/1.9.13//jackson-core-asl-1.9.13.jar jackson-core/2.13.4//jackson-core-2.13.4.jar -jackson-databind/2.13.4.1//jackson-databind-2.13.4.1.jar +jackson-databind/2.13.4.2//jackson-databind-2.13.4.2.jar jackson-dataformat-cbor/2.13.4//jackson-dataformat-cbor-2.13.4.jar jackson-dataformat-yaml/2.13.4//jackson-dataformat-yaml-2.13.4.jar jackson-datatype-jsr310/2.13.4//jackson-datatype-jsr310-2.13.4.jar diff --git a/pom.xml b/pom.xml index 2804a215fd9..d6b20512f6d 100644 --- a/pom.xml +++ b/pom.xml @@ -172,7 +172,7 @@ true 1.9.13 2.13.4 - 2.13.4.1 + 2.13.4.2 1.1.8.4 1.1.2 2.2.1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40886][BUILD] Bump Jackson Databind 2.13.4.2
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e73b157f5c4 [SPARK-40886][BUILD] Bump Jackson Databind 2.13.4.2 e73b157f5c4 is described below commit e73b157f5c4d20c49ec0e3a7bd82a72d3271f766 Author: Cheng Pan AuthorDate: Sun Oct 23 11:37:42 2022 -0500 [SPARK-40886][BUILD] Bump Jackson Databind 2.13.4.2 ### What changes were proposed in this pull request? Bump Jackson Databind from 2.13.4.1 to 2.13.4.2 ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? There is a regression about Gradle in 2.13.4.1 and got fixed in 2.13.4.2 https://github.com/FasterXML/jackson-databind/issues/3627 ### How was this patch tested? Existing UT. Closes #38355 from pan3793/SPARK-40886. Authored-by: Cheng Pan Signed-off-by: Sean Owen --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +- pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 1d1061aaadb..6756dd58312 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -113,7 +113,7 @@ ivy/2.5.0//ivy-2.5.0.jar jackson-annotations/2.13.4//jackson-annotations-2.13.4.jar jackson-core-asl/1.9.13//jackson-core-asl-1.9.13.jar jackson-core/2.13.4//jackson-core-2.13.4.jar -jackson-databind/2.13.4.1//jackson-databind-2.13.4.1.jar +jackson-databind/2.13.4.2//jackson-databind-2.13.4.2.jar jackson-dataformat-cbor/2.13.4//jackson-dataformat-cbor-2.13.4.jar jackson-dataformat-yaml/2.13.4//jackson-dataformat-yaml-2.13.4.jar jackson-datatype-jsr310/2.13.4//jackson-datatype-jsr310-2.13.4.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 39a0e617058..d29a10c1230 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -101,7 +101,7 @@ ivy/2.5.0//ivy-2.5.0.jar jackson-annotations/2.13.4//jackson-annotations-2.13.4.jar jackson-core-asl/1.9.13//jackson-core-asl-1.9.13.jar jackson-core/2.13.4//jackson-core-2.13.4.jar -jackson-databind/2.13.4.1//jackson-databind-2.13.4.1.jar +jackson-databind/2.13.4.2//jackson-databind-2.13.4.2.jar jackson-dataformat-cbor/2.13.4//jackson-dataformat-cbor-2.13.4.jar jackson-dataformat-yaml/2.13.4//jackson-dataformat-yaml-2.13.4.jar jackson-datatype-jsr310/2.13.4//jackson-datatype-jsr310-2.13.4.jar diff --git a/pom.xml b/pom.xml index d933c1c6f6d..78936392b85 100644 --- a/pom.xml +++ b/pom.xml @@ -176,7 +176,7 @@ true 1.9.13 2.13.4 - 2.13.4.1 + 2.13.4.2 1.1.8.4 3.0.2 1.15 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40760][SQL] Migrate type check failures of interval expressions onto error classes
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 625f76dae0d [SPARK-40760][SQL] Migrate type check failures of interval expressions onto error classes 625f76dae0d is described below commit 625f76dae0d9581428d6c5c4b58bf2958957c8c8 Author: Max Gekk AuthorDate: Sun Oct 23 13:32:34 2022 +0500 [SPARK-40760][SQL] Migrate type check failures of interval expressions onto error classes ### What changes were proposed in this pull request? In the PR, I propose to add new error sub-classes of the error class `DATATYPE_MISMATCH`, and use it in the case of type check failures of some interval expressions. ### Why are the changes needed? Migration onto error classes unifies Spark SQL error messages, and improves search-ability of errors. ### Does this PR introduce _any_ user-facing change? Yes. The PR changes user-facing error messages. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *AnalysisSuite" $ build/sbt "test:testOnly *ExpressionTypeCheckingSuite" $ build/sbt "test:testOnly *ApproxCountDistinctForIntervalsSuite" ``` Closes #38237 from MaxGekk/type-check-fails-interval-exprs. Authored-by: Max Gekk Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 5 +++ .../ApproxCountDistinctForIntervals.scala | 31 +++--- .../catalyst/expressions/aggregate/Average.scala | 2 +- .../sql/catalyst/expressions/aggregate/Sum.scala | 2 +- .../apache/spark/sql/catalyst/util/TypeUtils.scala | 20 + .../apache/spark/sql/types/AbstractDataType.scala | 9 .../sql/catalyst/analysis/AnalysisSuite.scala | 50 ++ .../analysis/ExpressionTypeCheckingSuite.scala | 26 +-- .../ApproxCountDistinctForIntervalsSuite.scala | 21 ++--- 9 files changed, 123 insertions(+), 43 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 5f4db145479..0f9b665718c 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -263,6 +263,11 @@ "The must be between (current value = )" ] }, + "WRONG_NUM_ENDPOINTS" : { +"message" : [ + "The number of endpoints must be >= 2 to construct intervals but the actual number is ." +] + }, "WRONG_NUM_PARAMS" : { "message" : [ "The requires parameters but the actual number is ." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala index f3bf251ba0b..0be4e4aa465 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala @@ -21,10 +21,11 @@ import java.util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, GenericInternalRow} import org.apache.spark.sql.catalyst.trees.BinaryLike import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform @@ -49,7 +50,10 @@ case class ApproxCountDistinctForIntervals( relativeSD: Double = 0.05, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0) - extends TypedImperativeAggregate[Array[Long]] with ExpectsInputTypes with BinaryLike[Expression] { + extends TypedImperativeAggregate[Array[Long]] + with ExpectsInputTypes + with BinaryLike[Expression] + with QueryErrorsBase { def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { this( @@ -77,19 +81,32 @@ case class ApproxCountDistinctForIntervals( if (defaultCheck.isFailure) { defaultCheck } else if (!endpointsExpression.foldable) { - TypeCheckFailure("The endpoints provided must be constant literals") + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" ->
[spark] branch master updated: [SPARK-40756][SQL] Migrate type check failures of string expressions onto error classes
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f81c26579e4 [SPARK-40756][SQL] Migrate type check failures of string expressions onto error classes f81c26579e4 is described below commit f81c26579e48816b863ca113d24eca018c939541 Author: panbingkun AuthorDate: Sun Oct 23 13:28:17 2022 +0500 [SPARK-40756][SQL] Migrate type check failures of string expressions onto error classes ### What changes were proposed in this pull request? This pr replace TypeCheckFailure by DataTypeMismatch in type checks in the string expressions, includes: - regexpExpressions.scala (RegExpReplace) - stringExpressions.scala (Etl) ### Why are the changes needed? Migration onto error classes unifies Spark SQL error messages. ### Does this PR introduce _any_ user-facing change? Yes. The PR changes user-facing error messages. ### How was this patch tested? - Add new UT - Update existed UT - Pass GA. Closes #38299 from panbingkun/SPARK-40756. Authored-by: panbingkun Signed-off-by: Max Gekk --- .../catalyst/expressions/regexpExpressions.scala | 22 -- .../catalyst/expressions/stringExpressions.scala | 33 +++ .../expressions/RegexpExpressionsSuite.scala | 23 +- .../expressions/StringExpressionsSuite.scala | 49 ++ .../sql-tests/results/regexp-functions.sql.out | 34 ++- 5 files changed, 147 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala index 500c040dfe4..2d079220812 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala @@ -27,7 +27,8 @@ import org.apache.commons.text.StringEscapeUtils import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.Cast._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.trees.BinaryLike @@ -37,7 +38,6 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String - abstract class StringRegexExpression extends BinaryExpression with ImplicitCastInputTypes with NullIntolerant with Predicate { @@ -594,14 +594,28 @@ case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expressio return defaultCheck } if (!pos.foldable) { - return TypeCheckFailure(s"Position expression must be foldable, but got $pos") + return DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> "position", + "inputType" -> toSQLType(pos.dataType), + "inputExpr" -> toSQLExpr(pos) +) + ) } val posEval = pos.eval() if (posEval == null || posEval.asInstanceOf[Int] > 0) { TypeCheckSuccess } else { - TypeCheckFailure(s"Position expression must be positive, but got: $posEval") + DataTypeMismatch( +errorSubClass = "VALUE_OUT_OF_RANGE", +messageParameters = Map( + "exprName" -> "position", + "valueRange" -> s"(0, ${Int.MaxValue}]", + "currentValue" -> toSQLValue(posEval, pos.dataType) +) + ) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 1bc79f23846..6927c4cfa3c 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, FunctionRegistry, TypeCheckResult} +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.expressions.Cast._ import org.apache.spark.sql.catalyst.expressions.codegen._ import
[spark] branch master updated (6c009180a75 -> 9d81f6e7506)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 6c009180a75 [SPARK-37945][SQL][CORE] Use error classes in the execution errors of arithmetic ops add 9d81f6e7506 [SPARK-40391][SQL][TESTS] Test the error class UNSUPPORTED_FEATURE.JDBC_TRANSACTION No new revisions were added by this update. Summary of changes: .../org.mockito.plugins.MockMaker | 2 + .../sql/errors/QueryExecutionErrorsSuite.scala | 69 +- 2 files changed, 69 insertions(+), 2 deletions(-) copy examples/src/main/python/sql/__init__.py => sql/core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker (97%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-37945][SQL][CORE] Use error classes in the execution errors of arithmetic ops
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6c009180a75 [SPARK-37945][SQL][CORE] Use error classes in the execution errors of arithmetic ops 6c009180a75 is described below commit 6c009180a75ae8e548ef4395211b13ee25ab60a9 Author: Khalid Mammadov AuthorDate: Sun Oct 23 11:44:49 2022 +0500 [SPARK-37945][SQL][CORE] Use error classes in the execution errors of arithmetic ops ### What changes were proposed in this pull request? Migrate the following errors in QueryExecutionErrors onto use error classes: unscaledValueTooLargeForPrecisionError -> UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION decimalPrecisionExceedsMaxPrecisionError -> DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION integerOverflowError -> INTEGER_OVERFLOW outOfDecimalTypeRangeError -> OUT_OF_DECIMAL_TYPE_RANGE ### Why are the changes needed? Porting ArithmeticExceptions to the new error framework ### Does this PR introduce _any_ user-facing change? Yes, errors will indicate that it's controlled Spark exception ### How was this patch tested? ./build/sbt "catalyst/testOnly org.apache.spark.sql.types.DecimalSuite" ./build/sbt "sql/testOnly org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite" ./build/sbt "core/testOnly testOnly org.apache.spark.SparkThrowableSuite" Closes #38273 from khalidmammadov/error_class2. Lead-authored-by: Khalid Mammadov Co-authored-by: khalidmammadov Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 40 ++-- .../spark/sql/errors/QueryExecutionErrors.scala| 41 + .../catalyst/expressions/CastWithAnsiOnSuite.scala | 4 +- .../org/apache/spark/sql/types/DecimalSuite.scala | 53 +- .../sources/RateStreamMicroBatchStream.scala | 8 ++-- .../sources/RateStreamProviderSuite.scala | 44 +- 6 files changed, 145 insertions(+), 45 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 240cf5f4eea..5f4db145479 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -276,6 +276,11 @@ ], "sqlState" : "22008" }, + "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION" : { +"message" : [ + "Decimal precision exceeds max precision ." +] + }, "DEFAULT_DATABASE_NOT_EXISTS" : { "message" : [ "Default database does not exist, please create it first or change default database to 'default'." @@ -416,6 +421,16 @@ } } }, + "INCORRECT_END_OFFSET" : { +"message" : [ + "Max offset with rowsPerSecond is , but it's now." +] + }, + "INCORRECT_RUMP_UP_RATE" : { +"message" : [ + "Max offset with rowsPerSecond is , but 'rampUpTimeSeconds' is ." +] + }, "INDEX_ALREADY_EXISTS" : { "message" : [ "Cannot create the index because it already exists. ." @@ -605,6 +620,11 @@ ], "sqlState" : "22005" }, + "OUT_OF_DECIMAL_TYPE_RANGE" : { +"message" : [ + "Out of decimal type range: ." +] + }, "PARSE_CHAR_MISSING_LENGTH" : { "message" : [ "DataType requires a length parameter, for example (10). Please specify the length." @@ -814,6 +834,11 @@ }, "sqlState" : "42000" }, + "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION" : { +"message" : [ + "Unscaled value too large for precision. If necessary set to false to bypass this error." +] + }, "UNSUPPORTED_DATATYPE" : { "message" : [ "Unsupported data type " @@ -3707,21 +3732,6 @@ "Unexpected: " ] }, - "_LEGACY_ERROR_TEMP_2117" : { -"message" : [ - "Unscaled value too large for precision. If necessary set to false to bypass this error." -] - }, - "_LEGACY_ERROR_TEMP_2118" : { -"message" : [ - "Decimal precision exceeds max precision " -] - }, - "_LEGACY_ERROR_TEMP_2119" : { -"message" : [ - "out of decimal type range: " -] - }, "_LEGACY_ERROR_TEMP_2120" : { "message" : [ "Do not support array of type ." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 5edffc87b84..4aedfb3b03d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1260,8 +1260,9 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { def unscaledValueTooLargeForPrecisionError(): SparkArithmeticException = { new