[jira] [Created] (SPARK-48150) Fix nullability of try_parse_json
Josh Rosen created SPARK-48150: -- Summary: Fix nullability of try_parse_json Key: SPARK-48150 URL: https://issues.apache.org/jira/browse/SPARK-48150 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Josh Rosen Assignee: Josh Rosen Followup for SPARK-47922: `try_parse_json` must declare a nullable output. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48128) BitwiseCount / bit_count generated code for boolean inputs fails to compile
Josh Rosen created SPARK-48128: -- Summary: BitwiseCount / bit_count generated code for boolean inputs fails to compile Key: SPARK-48128 URL: https://issues.apache.org/jira/browse/SPARK-48128 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Josh Rosen Assignee: Josh Rosen If the `BitwiseCount` / `bit_count` expresison is applied to a boolean type column then then it will trigger codegen fallback to interpreted because the generated code contains invalid Java syntax. This problem was masked because the QueryTest framework may not be fully exercising codegen paths (e.g. if constant folding occurs). -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48128) BitwiseCount / bit_count generated code for boolean inputs fails to compile
[ https://issues.apache.org/jira/browse/SPARK-48128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-48128: --- Description: If the `BitwiseCount` / `bit_count` expresison is applied to a boolean type column then then it will trigger codegen fallback to interpreted because the generated code contains invalid Java syntax, triggering errors like {code} java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 41, Column 11: Failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 41, Column 11: Unexpected token "if" in primary {code} This problem was masked because the QueryTest framework may not be fully exercising codegen paths (e.g. if constant folding occurs). was: If the `BitwiseCount` / `bit_count` expresison is applied to a boolean type column then then it will trigger codegen fallback to interpreted because the generated code contains invalid Java syntax. This problem was masked because the QueryTest framework may not be fully exercising codegen paths (e.g. if constant folding occurs). > BitwiseCount / bit_count generated code for boolean inputs fails to compile > --- > > Key: SPARK-48128 > URL: https://issues.apache.org/jira/browse/SPARK-48128 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > > If the `BitwiseCount` / `bit_count` expresison is applied to a boolean type > column then then it will trigger codegen fallback to interpreted because the > generated code contains invalid Java syntax, triggering errors like > {code} > java.util.concurrent.ExecutionException: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 41, Column 11: Failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 41, Column 11: Unexpected token "if" in primary > {code} > This problem was masked because the QueryTest framework may not be fully > exercising codegen paths (e.g. if constant folding occurs). -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48081) Fix ClassCastException in NTile.checkInputDataTypes() when argument is non-foldable or of wrong type
[ https://issues.apache.org/jira/browse/SPARK-48081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-48081: --- Summary: Fix ClassCastException in NTile.checkInputDataTypes() when argument is non-foldable or of wrong type (was: Fix ClassCastException in NTile.checkInputDataTypes() when input data type is mismatched or non-foldable ) > Fix ClassCastException in NTile.checkInputDataTypes() when argument is > non-foldable or of wrong type > > > Key: SPARK-48081 > URL: https://issues.apache.org/jira/browse/SPARK-48081 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > Labels: pull-request-available > > {code:java} > sql("select ntile(99.9) OVER (order by id) from range(10)"){code} > results in > {code} > java.lang.ClassCastException: class org.apache.spark.sql.types.Decimal > cannot be cast to class java.lang.Integer (org.apache.spark.sql.types.Decimal > is in unnamed module of loader 'app'; java.lang.Integer is in module > java.base of loader 'bootstrap') > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:99) > at > org.apache.spark.sql.catalyst.expressions.NTile.checkInputDataTypes(windowExpressions.scala:877) > at > org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:267) > at > org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:267) > at > org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$childrenResolved$1(Expression.scala:279) > at > org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$childrenResolved$1$adapted(Expression.scala:279) > at scala.collection.IterableOnceOps.forall(IterableOnce.scala:633) > at scala.collection.IterableOnceOps.forall$(IterableOnce.scala:630) > at scala.collection.AbstractIterable.forall(Iterable.scala:935) > at > org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:279) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$22$$anonfun$applyOrElse$157.applyOrElse(Analyzer.scala:2243) > > {code} > instead of the intended user-facing error message. This is a minor bug that > was introduced in a previous error class refactoring PR. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48081) Fix ClassCastException in NTile.checkInputDataTypes() when input data type is mismatched or non-foldable
[ https://issues.apache.org/jira/browse/SPARK-48081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-48081: --- Summary: Fix ClassCastException in NTile.checkInputDataTypes() when input data type is mismatched or non-foldable (was: Fix ClassCastException in NTile.checkInputDataTypes() when data type is mismatched) > Fix ClassCastException in NTile.checkInputDataTypes() when input data type is > mismatched or non-foldable > - > > Key: SPARK-48081 > URL: https://issues.apache.org/jira/browse/SPARK-48081 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > Labels: pull-request-available > > {code:java} > sql("select ntile(99.9) OVER (order by id) from range(10)"){code} > results in > {code} > java.lang.ClassCastException: class org.apache.spark.sql.types.Decimal > cannot be cast to class java.lang.Integer (org.apache.spark.sql.types.Decimal > is in unnamed module of loader 'app'; java.lang.Integer is in module > java.base of loader 'bootstrap') > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:99) > at > org.apache.spark.sql.catalyst.expressions.NTile.checkInputDataTypes(windowExpressions.scala:877) > at > org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:267) > at > org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:267) > at > org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$childrenResolved$1(Expression.scala:279) > at > org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$childrenResolved$1$adapted(Expression.scala:279) > at scala.collection.IterableOnceOps.forall(IterableOnce.scala:633) > at scala.collection.IterableOnceOps.forall$(IterableOnce.scala:630) > at scala.collection.AbstractIterable.forall(Iterable.scala:935) > at > org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:279) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$22$$anonfun$applyOrElse$157.applyOrElse(Analyzer.scala:2243) > > {code} > instead of the intended user-facing error message. This is a minor bug that > was introduced in a previous error class refactoring PR. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48081) Fix ClassCastException in NTile.checkInputDataTypes() when data type is mismatched
[ https://issues.apache.org/jira/browse/SPARK-48081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-48081: --- Summary: Fix ClassCastException in NTile.checkInputDataTypes() when data type is mismatched (was: Fix ClassCastException in NTile.checkInputDataTypes()) > Fix ClassCastException in NTile.checkInputDataTypes() when data type is > mismatched > -- > > Key: SPARK-48081 > URL: https://issues.apache.org/jira/browse/SPARK-48081 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > > {code:java} > sql("select ntile(99.9) OVER (order by id) from range(10)"){code} > results in > {code} > java.lang.ClassCastException: class org.apache.spark.sql.types.Decimal > cannot be cast to class java.lang.Integer (org.apache.spark.sql.types.Decimal > is in unnamed module of loader 'app'; java.lang.Integer is in module > java.base of loader 'bootstrap') > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:99) > at > org.apache.spark.sql.catalyst.expressions.NTile.checkInputDataTypes(windowExpressions.scala:877) > at > org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:267) > at > org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:267) > at > org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$childrenResolved$1(Expression.scala:279) > at > org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$childrenResolved$1$adapted(Expression.scala:279) > at scala.collection.IterableOnceOps.forall(IterableOnce.scala:633) > at scala.collection.IterableOnceOps.forall$(IterableOnce.scala:630) > at scala.collection.AbstractIterable.forall(Iterable.scala:935) > at > org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:279) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$22$$anonfun$applyOrElse$157.applyOrElse(Analyzer.scala:2243) > > {code} > instead of the intended user-facing error message. This is a minor bug that > was introduced in a previous error class refactoring PR. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48081) Fix ClassCastException in NTile.checkInputDataTypes()
Josh Rosen created SPARK-48081: -- Summary: Fix ClassCastException in NTile.checkInputDataTypes() Key: SPARK-48081 URL: https://issues.apache.org/jira/browse/SPARK-48081 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Josh Rosen Assignee: Josh Rosen {code:java} sql("select ntile(99.9) OVER (order by id) from range(10)"){code} results in {code} java.lang.ClassCastException: class org.apache.spark.sql.types.Decimal cannot be cast to class java.lang.Integer (org.apache.spark.sql.types.Decimal is in unnamed module of loader 'app'; java.lang.Integer is in module java.base of loader 'bootstrap') at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:99) at org.apache.spark.sql.catalyst.expressions.NTile.checkInputDataTypes(windowExpressions.scala:877) at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:267) at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:267) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$childrenResolved$1(Expression.scala:279) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$childrenResolved$1$adapted(Expression.scala:279) at scala.collection.IterableOnceOps.forall(IterableOnce.scala:633) at scala.collection.IterableOnceOps.forall$(IterableOnce.scala:630) at scala.collection.AbstractIterable.forall(Iterable.scala:935) at org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:279) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$22$$anonfun$applyOrElse$157.applyOrElse(Analyzer.scala:2243) {code} instead of the intended user-facing error message. This is a minor bug that was introduced in a previous error class refactoring PR. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47734) Fix flaky pyspark.sql.dataframe.DataFrame.writeStream doctest by stopping streaming query
Josh Rosen created SPARK-47734: -- Summary: Fix flaky pyspark.sql.dataframe.DataFrame.writeStream doctest by stopping streaming query Key: SPARK-47734 URL: https://issues.apache.org/jira/browse/SPARK-47734 Project: Spark Issue Type: Improvement Components: PySpark, Tests Affects Versions: 4.0.0 Reporter: Josh Rosen Assignee: Josh Rosen https://issues.apache.org/jira/browse/SPARK-47199 didn't fix the flakiness in the pyspark.sql.dataframe.DataFrame.writeStream doctest : the problem is not that we are colliding on the test but, rather, that the test is starting a background thread to write to a directory then deleting that directory from the main test thread, something which is inherently race prone. The fix is simple: stop the streaming query in the doctest itself, similar to other streaming doctest examples. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-47068) Recover -1 and 0 case for spark.sql.execution.arrow.maxRecordsPerBatch
[ https://issues.apache.org/jira/browse/SPARK-47068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-47068. Resolution: Fixed Marking this issue as fixed. > Recover -1 and 0 case for spark.sql.execution.arrow.maxRecordsPerBatch > -- > > Key: SPARK-47068 > URL: https://issues.apache.org/jira/browse/SPARK-47068 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.4.1, 3.5.0, 4.0.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Critical > Labels: pull-request-available > Fix For: 4.0.0, 3.5.2, 3.4.3 > > > {code} > import pandas as pd > spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") > spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 0) > spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", False) > spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas() > spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", -1) > spark.createDataFrame(pd.DataFrame({'a': [123]})).toPandas() > {code} > {code} > /.../spark/python/pyspark/sql/pandas/conversion.py:371: UserWarning: > createDataFrame attempted Arrow optimization because > 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached > the error below and will not continue because automatic fallback with > 'spark.sql.execution.arrow.pyspark.fallback.enabled' has been set to false. > range() arg 3 must not be zero > warn(msg) > Traceback (most recent call last): > File "", line 1, in > File "/.../spark/python/pyspark/sql/session.py", line 1483, in > createDataFrame > return super(SparkSession, self).createDataFrame( # type: > ignore[call-overload] > File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 351, in > createDataFrame > return self._create_from_pandas_with_arrow(data, schema, timezone) > File "/.../spark/python/pyspark/sql/pandas/conversion.py", line 633, in > _create_from_pandas_with_arrow > pdf_slices = (pdf.iloc[start : start + step] for start in range(0, > len(pdf), step)) > ValueError: range() arg 3 must not be zero > {code} > {code} > Empty DataFrame > Columns: [a] > Index: [] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-46251) Spark 3.3.3 tuple encoders built using Encoders.tuple do not correctly cast null into None for Option values
[ https://issues.apache.org/jira/browse/SPARK-46251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826945#comment-17826945 ] Josh Rosen edited comment on SPARK-46251 at 3/14/24 5:34 AM: - FYI, this looks like it was duplicated by https://issues.apache.org/jira/browse/SPARK-47385 which now has a PR open to fix it. was (Author: joshrosen): FYI, this looks like a duplicate of https://issues.apache.org/jira/browse/SPARK-47385 which now has a PR open to fix it. > Spark 3.3.3 tuple encoders built using Encoders.tuple do not correctly cast > null into None for Option values > > > Key: SPARK-46251 > URL: https://issues.apache.org/jira/browse/SPARK-46251 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.3, 3.4.2, 3.4.0, 3.4.1, 3.5.0 >Reporter: Will Boulter >Priority: Major > > In Spark {{3.3.2}} encoders created using {{Encoders.tuple(encoder1, > encoder2, ..)}} correctly handle casting {{null}} into {{None}} when the > target type is an Option. > In Spark {{{}3.3.3{}}}, this behaviour has changed and the Option value comes > through as {{null}} which is likely to cause a {{NullPointerException}} for > most Scala code that operates on the Option. The change seems to be related > to the following commit: > [https://github.com/apache/spark/commit/9110c05d54c392e55693eba4509be37c571d610a] > I have made a reproduction with a couple of examples in a public Github repo > here: > [https://github.com/q-willboulter/spark-tuple-encoders-bug] > The common use case where this is likely to be encountered is while doing any > joins that can return null, e.g. left or outer joins. When casting the result > of a left join it is sensible to wrap the right-hand side in an Option to > handle the case where there is no match. Since 3.3.3 this would fail if the > encoder is derived manually using {{Encoders.tuple(leftEncoder, > rightEncoder).}} > If the entire tuple encoder {{Encoder[(Left, Option[Right]])}} is derived at > once using reflection, the encoder works as expected. The bug appears to be > in the following function inside {{ExpressionEncoder.scala}} > {code:java} > def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = > ...{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-46251) Spark 3.3.3 tuple encoders built using Encoders.tuple do not correctly cast null into None for Option values
[ https://issues.apache.org/jira/browse/SPARK-46251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826945#comment-17826945 ] Josh Rosen commented on SPARK-46251: FYI, this looks like a duplicate of https://issues.apache.org/jira/browse/SPARK-47385 which now has a PR open to fix it. > Spark 3.3.3 tuple encoders built using Encoders.tuple do not correctly cast > null into None for Option values > > > Key: SPARK-46251 > URL: https://issues.apache.org/jira/browse/SPARK-46251 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.3, 3.4.2, 3.4.0, 3.4.1, 3.5.0 >Reporter: Will Boulter >Priority: Major > > In Spark {{3.3.2}} encoders created using {{Encoders.tuple(encoder1, > encoder2, ..)}} correctly handle casting {{null}} into {{None}} when the > target type is an Option. > In Spark {{{}3.3.3{}}}, this behaviour has changed and the Option value comes > through as {{null}} which is likely to cause a {{NullPointerException}} for > most Scala code that operates on the Option. The change seems to be related > to the following commit: > [https://github.com/apache/spark/commit/9110c05d54c392e55693eba4509be37c571d610a] > I have made a reproduction with a couple of examples in a public Github repo > here: > [https://github.com/q-willboulter/spark-tuple-encoders-bug] > The common use case where this is likely to be encountered is while doing any > joins that can return null, e.g. left or outer joins. When casting the result > of a left join it is sensible to wrap the right-hand side in an Option to > handle the case where there is no match. Since 3.3.3 this would fail if the > encoder is derived manually using {{Encoders.tuple(leftEncoder, > rightEncoder).}} > If the entire tuple encoder {{Encoder[(Left, Option[Right]])}} is derived at > once using reflection, the encoder works as expected. The bug appears to be > in the following function inside {{ExpressionEncoder.scala}} > {code:java} > def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = > ...{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47121) Avoid noisy RejectedExecutionExceptions during StandaloneSchedulerBackend shutdown
Josh Rosen created SPARK-47121: -- Summary: Avoid noisy RejectedExecutionExceptions during StandaloneSchedulerBackend shutdown Key: SPARK-47121 URL: https://issues.apache.org/jira/browse/SPARK-47121 Project: Spark Issue Type: Improvement Components: Scheduler Affects Versions: 3.5.0 Reporter: Josh Rosen Assignee: Josh Rosen While it is in the process of shutting down, the StandaloneSchedulerBackend might throw RejectedExecutionExceptions when RPC handler `onDisconnected` methods attempt to submit new tasks to a stopped executorDelayRemoveThread executor service. We can reduce log and uncaught exception noise by catching and ignoring these exceptions if they occur during shudown. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46862) Incorrect count() of a dataframe loaded from CSV datasource
[ https://issues.apache.org/jira/browse/SPARK-46862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-46862: --- Labels: correctness pull-request-available (was: pull-request-available) > Incorrect count() of a dataframe loaded from CSV datasource > --- > > Key: SPARK-46862 > URL: https://issues.apache.org/jira/browse/SPARK-46862 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 4.0.0 >Reporter: Max Gekk >Assignee: Max Gekk >Priority: Major > Labels: correctness, pull-request-available > Fix For: 4.0.0, 3.5.1, 3.4.3 > > Attachments: es-939111-data.csv > > > The example below portraits the issue: > {code:java} > >>> df=spark.read.option("multiline", "true").option("header", > >>> "true").option("escape", '"').csv("es-939111-data.csv") > >>> df.count() > 4 > >>> df.cache() > DataFrame[jobID: string, Name: string, City: string, Active: string] > >>> df.count() > 5{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-46365) Spark 3.5.0 Regression: Window Function Combination Yields Null Values
[ https://issues.apache.org/jira/browse/SPARK-46365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796054#comment-17796054 ] Josh Rosen commented on SPARK-46365: I think that this is a duplicate of SPARK-45543, which is fixed in the forthcoming Spark 3.5.1. I figured this out by running `git bisect` in `branch-3.5` with the above reproduction. > Spark 3.5.0 Regression: Window Function Combination Yields Null Values > --- > > Key: SPARK-46365 > URL: https://issues.apache.org/jira/browse/SPARK-46365 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Boris PEREVALOV >Priority: Major > > When combining two window functions (first one to get the previous non-null > value, second one to get the latest rows only), the result is not correct > since version 3.5.0. > > Here is a simple Scala example: > > {code:java} > import org.apache.spark.sql.expressions.Window > import org.apache.spark.sql.functions._ > > case class Event(timestamp: Long, id: Long, value: String) > > val events = Seq( > Event(timestamp = 1702289001, id = 1 , value = "non-null value"), > Event(timestamp = 1702289002, id = 1 , value = "new non-null value"), > Event(timestamp = 1702289003, id = 1 , value = null), > Event(timestamp = 1702289004, id = 2 , value = "non-null value"), > Event(timestamp = 1702289005, id = 2 , value = null), > ).toDF > > val window = Window.partitionBy("id").orderBy($"timestamp".desc) > > val eventsWithLatestNonNullValue = events > .withColumn( > "value", > first("value", ignoreNulls = true) over > window.rangeBetween(Window.currentRow, Window.unboundedFollowing) > ) > > eventsWithLatestNonNullValue.show > > val latestEvents = eventsWithLatestNonNullValue > .withColumn("n", row_number over window) > .where("n = 1") > .drop("n") > > latestEvents.show > {code} > > > Current result (Spark 3.5.0) > > {code:java} > +--+---+-+ > | timestamp| id|value| > +--+---+-+ > |1702289003| 1| NULL| > |1702289005| 2| NULL| > +--+---+-+ > {code} > > > Expected result (all versions > 3.5.0): > > {code:java} > +--+---+--+ > | timestamp| id| value| > +--+---+--+ > |1702289003| 1|new non-null value| > |1702289005| 2| non-null value| > +--+---+--+ > {code} > > Execution plans are different. > > Spark 3.5.0: > > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [timestamp#1856L, id#1857L, value#1867] > +- Filter (n#1887 = 1) > +- Window [first(value#1858, true) windowspecdefinition(id#1857L, > timestamp#1856L DESC NULLS LAST, specifiedwindowframe(RangeFrame, > currentrow$(), unboundedfollowing$())) AS value#1867, row_number() > windowspecdefinition(id#1857L, timestamp#1856L DESC NULLS LAST, > specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS > n#1887], [id#1857L], [timestamp#1856L DESC NULLS LAST] > +- WindowGroupLimit [id#1857L], [timestamp#1856L DESC NULLS LAST], > row_number(), 1, Final > +- Sort [id#1857L ASC NULLS FIRST, timestamp#1856L DESC NULLS > LAST], false, 0 > +- Exchange hashpartitioning(id#1857L, 200), > ENSURE_REQUIREMENTS, [plan_id=326] > +- WindowGroupLimit [id#1857L], [timestamp#1856L DESC NULLS > LAST], row_number(), 1, Partial > +- Sort [id#1857L ASC NULLS FIRST, timestamp#1856L DESC > NULLS LAST], false, 0 > +- LocalTableScan [timestamp#1856L, id#1857L, > value#1858] > {code} > > Spark 3.4.0: > > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [timestamp#6L, id#7L, value#17] > +- Filter (n#37 = 1) > +- Window [first(value#8, true) windowspecdefinition(id#7L, > timestamp#6L DESC NULLS LAST, specifiedwindowframe(RangeFrame, currentrow$(), > unboundedfollowing$())) AS value#17, row_number() windowspecdefinition(id#7L, > timestamp#6L DESC NULLS LAST, specifiedwindowframe(RowFrame, > unboundedpreceding$(), currentrow$())) AS n#37], [id#7L], [timestamp#6L DESC > NULLS LAST] > +- Sort [id#7L ASC NULLS FIRST, timestamp#6L DESC NULLS LAST], > false, 0 > +- Exchange hashpartitioning(id#7L, 200), ENSURE_REQUIREMENTS, > [plan_id=60] > +- LocalTableScan [timestamp#6L, id#7L, value#8] > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-46365) Spark 3.5.0 Regression: Window Function Combination Yields Null Values
[ https://issues.apache.org/jira/browse/SPARK-46365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795932#comment-17795932 ] Josh Rosen commented on SPARK-46365: This issue appears to be fixed in [https://github.com/apache/spark/tree/branch-3.5] but I'm not sure by which commit. > Spark 3.5.0 Regression: Window Function Combination Yields Null Values > --- > > Key: SPARK-46365 > URL: https://issues.apache.org/jira/browse/SPARK-46365 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Boris PEREVALOV >Priority: Major > > When combining two window functions (first one to get the previous non-null > value, second one to get the latest rows only), the result is not correct > since version 3.5.0. > > Here is a simple Scala example: > > {code:java} > import org.apache.spark.sql.expressions.Window > import org.apache.spark.sql.functions._ > > case class Event(timestamp: Long, id: Long, value: String) > > val events = Seq( > Event(timestamp = 1702289001, id = 1 , value = "non-null value"), > Event(timestamp = 1702289002, id = 1 , value = "new non-null value"), > Event(timestamp = 1702289003, id = 1 , value = null), > Event(timestamp = 1702289004, id = 2 , value = "non-null value"), > Event(timestamp = 1702289005, id = 2 , value = null), > ).toDF > > val window = Window.partitionBy("id").orderBy($"timestamp".desc) > > val eventsWithLatestNonNullValue = events > .withColumn( > "value", > first("value", ignoreNulls = true) over > window.rangeBetween(Window.currentRow, Window.unboundedFollowing) > ) > > eventsWithLatestNonNullValue.show > > val latestEvents = eventsWithLatestNonNullValue > .withColumn("n", row_number over window) > .where("n = 1") > .drop("n") > > latestEvents.show > {code} > > > Current result (Spark 3.5.0) > > {code:java} > +--+---+-+ > | timestamp| id|value| > +--+---+-+ > |1702289003| 1| NULL| > |1702289005| 2| NULL| > +--+---+-+ > {code} > > > Expected result (all versions > 3.5.0): > > {code:java} > +--+---+--+ > | timestamp| id| value| > +--+---+--+ > |1702289003| 1|new non-null value| > |1702289005| 2| non-null value| > +--+---+--+ > {code} > > Execution plans are different. > > Spark 3.5.0: > > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [timestamp#1856L, id#1857L, value#1867] > +- Filter (n#1887 = 1) > +- Window [first(value#1858, true) windowspecdefinition(id#1857L, > timestamp#1856L DESC NULLS LAST, specifiedwindowframe(RangeFrame, > currentrow$(), unboundedfollowing$())) AS value#1867, row_number() > windowspecdefinition(id#1857L, timestamp#1856L DESC NULLS LAST, > specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS > n#1887], [id#1857L], [timestamp#1856L DESC NULLS LAST] > +- WindowGroupLimit [id#1857L], [timestamp#1856L DESC NULLS LAST], > row_number(), 1, Final > +- Sort [id#1857L ASC NULLS FIRST, timestamp#1856L DESC NULLS > LAST], false, 0 > +- Exchange hashpartitioning(id#1857L, 200), > ENSURE_REQUIREMENTS, [plan_id=326] > +- WindowGroupLimit [id#1857L], [timestamp#1856L DESC NULLS > LAST], row_number(), 1, Partial > +- Sort [id#1857L ASC NULLS FIRST, timestamp#1856L DESC > NULLS LAST], false, 0 > +- LocalTableScan [timestamp#1856L, id#1857L, > value#1858] > {code} > > Spark 3.4.0: > > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [timestamp#6L, id#7L, value#17] > +- Filter (n#37 = 1) > +- Window [first(value#8, true) windowspecdefinition(id#7L, > timestamp#6L DESC NULLS LAST, specifiedwindowframe(RangeFrame, currentrow$(), > unboundedfollowing$())) AS value#17, row_number() windowspecdefinition(id#7L, > timestamp#6L DESC NULLS LAST, specifiedwindowframe(RowFrame, > unboundedpreceding$(), currentrow$())) AS n#37], [id#7L], [timestamp#6L DESC > NULLS LAST] > +- Sort [id#7L ASC NULLS FIRST, timestamp#6L DESC NULLS LAST], > false, 0 > +- Exchange hashpartitioning(id#7L, 200), ENSURE_REQUIREMENTS, > [plan_id=60] > +- LocalTableScan [timestamp#6L, id#7L, value#8] > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-46125) Memory leak when using createDataFrame with persist
[ https://issues.apache.org/jira/browse/SPARK-46125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17790751#comment-17790751 ] Josh Rosen commented on SPARK-46125: I think that this issue relates specifically to `createDataFrame` and other mechanisms for creating Datasets or RDDs from driver-side data. I was able to reproduce the memory effects that you reported using a synthetic dataset: {code:java} n_rows = 100 data = np.random.randn(n_rows, n_cols) pdf = pd.DataFrame(data, columns=[f'Column_{i}' for i in range(n_cols)]) {code} I took heap dumps in the "with unpersist" and "without unpersist" cases and saw that most of the difference was due to `byte[]` arrays. That, in turn, is due to ParallelCollectionPartitions being kept alive in a ParallelCollectionRDD that is retained by the CacheManager. When you cache a query, Spark keeps the physical query plan alive so that it can recompute cached data if it is lost (e.g. due to a node failure). For Datasets or RDDs that are created from data on the driver, that driver-side data is kept alive. It's this CacheManager reference to the physical plan which is keeping the source RDD from being cleaned: this is why `del df` followed by GC does not clean up the RDD's memory. --- If you use `localCheckpoint` then Spark will persist the data to disk and truncate the RDD lineage, thereby avoiding driver-side memory consumption from the parallel collection RDD, but this will have the side effect of removing fault-tolerance: if any node is lost then the data will be lost and any attempts to access it will result in query failures. !image-2023-11-28-12-55-58-461.png! > Memory leak when using createDataFrame with persist > --- > > Key: SPARK-46125 > URL: https://issues.apache.org/jira/browse/SPARK-46125 > Project: Spark > Issue Type: Bug > Components: Input/Output, PySpark >Affects Versions: 3.5.0 >Reporter: Arman Yazdani >Priority: Major > Labels: PySpark, memory-leak, persist > Attachments: CreateDataFrameWithUnpersist.png, > CreateDataFrameWithoutUnpersist.png, ReadParquetWithoutUnpersist.png, > image-2023-11-28-12-55-58-461.png > > > When I create a dataset from pandas data frame and persisting it (DISK_ONLY), > some "byte[]" objects (total size of imported data frame) will still remain > in the driver's heap memory. > This is the sample code for reproducing it: > {code:python} > import pandas as pd > import gc > from pyspark.sql import SparkSession > from pyspark.storagelevel import StorageLevel > spark = SparkSession.builder \ > .config("spark.driver.memory", "4g") \ > .config("spark.executor.memory", "4g") \ > .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ > .getOrCreate() > pdf = pd.read_pickle('tmp/input.pickle') > df = spark.createDataFrame(pdf) > df = df.persist(storageLevel=StorageLevel.DISK_ONLY) > df.count() > del pdf > del df > gc.collect() > spark.sparkContext._jvm.System.gc(){code} > After running this code, I will perform a manual GC in VisualVM, but the > driver memory usage will remain at 550 MBs (at start it was about 50 MBs). > !CreateDataFrameWithoutUnpersist.png|width=467,height=349! > Then I tested with adding {{"df = df.unpersist()"}} after the > {{"df.count()"}} line and everything was OK (Memory usage after performing > manual GC was about 50 MBs). > !CreateDataFrameWithUnpersist.png|width=468,height=300! > Also, I tried with reading from parquet file (without adding unpersist line) > with this code: > {code:python} > import gc > from pyspark.sql import SparkSession > from pyspark.storagelevel import StorageLevel > spark = SparkSession.builder \ > .config("spark.driver.memory", "4g") \ > .config("spark.executor.memory", "4g") \ > .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ > .getOrCreate() > df = spark.read.parquet('tmp/input.parquet') > df = df.persist(storageLevel=StorageLevel.DISK_ONLY) > df.count() > del df > gc.collect() > spark.sparkContext._jvm.System.gc(){code} > Again everything was fine and memory usage was about 50 MBs after performing > manual GC. > !ReadParquetWithoutUnpersist.png|width=473,height=302! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46125) Memory leak when using createDataFrame with persist
[ https://issues.apache.org/jira/browse/SPARK-46125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-46125: --- Attachment: image-2023-11-28-12-55-58-461.png > Memory leak when using createDataFrame with persist > --- > > Key: SPARK-46125 > URL: https://issues.apache.org/jira/browse/SPARK-46125 > Project: Spark > Issue Type: Bug > Components: Input/Output, PySpark >Affects Versions: 3.5.0 >Reporter: Arman Yazdani >Priority: Major > Labels: PySpark, memory-leak, persist > Attachments: CreateDataFrameWithUnpersist.png, > CreateDataFrameWithoutUnpersist.png, ReadParquetWithoutUnpersist.png, > image-2023-11-28-12-55-58-461.png > > > When I create a dataset from pandas data frame and persisting it (DISK_ONLY), > some "byte[]" objects (total size of imported data frame) will still remain > in the driver's heap memory. > This is the sample code for reproducing it: > {code:python} > import pandas as pd > import gc > from pyspark.sql import SparkSession > from pyspark.storagelevel import StorageLevel > spark = SparkSession.builder \ > .config("spark.driver.memory", "4g") \ > .config("spark.executor.memory", "4g") \ > .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ > .getOrCreate() > pdf = pd.read_pickle('tmp/input.pickle') > df = spark.createDataFrame(pdf) > df = df.persist(storageLevel=StorageLevel.DISK_ONLY) > df.count() > del pdf > del df > gc.collect() > spark.sparkContext._jvm.System.gc(){code} > After running this code, I will perform a manual GC in VisualVM, but the > driver memory usage will remain at 550 MBs (at start it was about 50 MBs). > !CreateDataFrameWithoutUnpersist.png|width=467,height=349! > Then I tested with adding {{"df = df.unpersist()"}} after the > {{"df.count()"}} line and everything was OK (Memory usage after performing > manual GC was about 50 MBs). > !CreateDataFrameWithUnpersist.png|width=468,height=300! > Also, I tried with reading from parquet file (without adding unpersist line) > with this code: > {code:python} > import gc > from pyspark.sql import SparkSession > from pyspark.storagelevel import StorageLevel > spark = SparkSession.builder \ > .config("spark.driver.memory", "4g") \ > .config("spark.executor.memory", "4g") \ > .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ > .getOrCreate() > df = spark.read.parquet('tmp/input.parquet') > df = df.persist(storageLevel=StorageLevel.DISK_ONLY) > df.count() > del df > gc.collect() > spark.sparkContext._jvm.System.gc(){code} > Again everything was fine and memory usage was about 50 MBs after performing > manual GC. > !ReadParquetWithoutUnpersist.png|width=473,height=302! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-46105) df.emptyDataFrame shows 1 if we repartition(1) in Spark 3.3.x and above
[ https://issues.apache.org/jira/browse/SPARK-46105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17790742#comment-17790742 ] Josh Rosen commented on SPARK-46105: {quote}The reason for raising this as a bug is I have a scenario where my final dataframe returns 0 records in EKS(local spark) with single node(driver and executor on the sam node) but it returns 1 in EMR both uses a same spark version 3.3.3. {quote} To clarify: by "returns 0 records", are you referring to the record count of the data frame (i.e. whether isEmpty returns true or false) or to the partition count? In other words, are you saying that EMR returns an incorrect record count or do you mean that it returns an unexpected partition count? > df.emptyDataFrame shows 1 if we repartition(1) in Spark 3.3.x and above > --- > > Key: SPARK-46105 > URL: https://issues.apache.org/jira/browse/SPARK-46105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.3.3 > Environment: EKS > EMR >Reporter: dharani_sugumar >Priority: Major > Attachments: Screenshot 2023-11-26 at 11.54.58 AM.png > > > {color:#FF}Version: 3.3.3{color} > > {color:#FF}scala> val df = spark.emptyDataFrame{color} > {color:#FF}df: org.apache.spark.sql.DataFrame = []{color} > {color:#FF}scala> df.rdd.getNumPartitions{color} > {color:#FF}res0: Int = 0{color} > {color:#FF}scala> df.repartition(1).rdd.getNumPartitions{color} > {color:#FF}res1: Int = 1{color} > {color:#FF}scala> df.repartition(1).rdd.isEmpty(){color} > {color:#FF}[Stage 1:> > (0 + 1) / > res2: Boolean = true{color} > Version: 3.2.4 > scala> val df = spark.emptyDataFrame > df: org.apache.spark.sql.DataFrame = [] > scala> df.rdd.getNumPartitions > res0: Int = 0 > scala> df.repartition(1).rdd.getNumPartitions > res1: Int = 0 > scala> df.repartition(1).rdd.isEmpty() > res2: Boolean = true > > {color:#FF}Version: 3.5.0{color} > {color:#FF}scala> val df = spark.emptyDataFrame{color} > {color:#FF}df: org.apache.spark.sql.DataFrame = []{color} > {color:#FF}scala> df.rdd.getNumPartitions{color} > {color:#FF}res0: Int = 0{color} > {color:#FF}scala> df.repartition(1).rdd.getNumPartitions{color} > {color:#FF}res1: Int = 1{color} > {color:#FF}scala> df.repartition(1).rdd.isEmpty(){color} > {color:#FF}[Stage 1:> > (0 + 1) / > res2: Boolean = true{color} > > When we do repartition of 1 on an empty dataframe, the resultant partition is > 1 in version 3.3.x and 3.5.x whereas when I do the same in version 3.2.x, the > resultant partition is 0. May i know why this behaviour is changed from 3.2.x > to higher versions. > > The reason for raising this as a bug is I have a scenario where my final > dataframe returns 0 records in EKS(local spark) with single node(driver and > executor on the sam node) but it returns 1 in EMR both uses a same spark > version 3.3.3. I'm not sure why this behaves different in both the > environments. As a interim solution, I had to repartition a empty dataframe > if my final dataframe is empty which returns 1 for 3.3.3. Would like to know > if this really a bug or this behaviour exists in the future versions and > cannot be changed? > > Because, If we go for a spark upgrade and this behaviour is changed, we will > face the issue again. > Please confirm on this. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44641) SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet
[ https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-44641: --- Labels: correctness (was: ) > SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but > conditions unmet > -- > > Key: SPARK-44641 > URL: https://issues.apache.org/jira/browse/SPARK-44641 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0, 3.4.1 >Reporter: Szehon Ho >Assignee: Chao Sun >Priority: Blocker > Labels: correctness > Fix For: 3.4.2, 3.5.0 > > > Adding the following test case in KeyGroupedPartitionSuite demonstrates the > problem. > > {code:java} > test("test join key is the second partition key and a transform") { > val items_partitions = Array(bucket(8, "id"), days("arrive_time")) > createTable(items, items_schema, items_partitions) > sql(s"INSERT INTO testcat.ns.$items VALUES " + > s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + > s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + > s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + > s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + > s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") > val purchases_partitions = Array(bucket(8, "item_id"), days("time")) > createTable(purchases, purchases_schema, purchases_partitions) > sql(s"INSERT INTO testcat.ns.$purchases VALUES " + > s"(1, 42.0, cast('2020-01-01' as timestamp)), " + > s"(1, 44.0, cast('2020-01-15' as timestamp)), " + > s"(1, 45.0, cast('2020-01-15' as timestamp)), " + > s"(2, 11.0, cast('2020-01-01' as timestamp)), " + > s"(3, 19.5, cast('2020-02-01' as timestamp))") > withSQLConf( > SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", > SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", > SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> > "true") { > val df = sql("SELECT id, name, i.price as purchase_price, " + > "p.item_id, p.price as sale_price " + > s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + > "ON i.arrive_time = p.time " + > "ORDER BY id, purchase_price, p.item_id, sale_price") > val shuffles = collectShuffles(df.queryExecution.executedPlan) > assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys > are partition keys") > checkAnswer(df, > Seq( > Row(1, "aa", 40.0, 1, 42.0), > Row(1, "aa", 40.0, 2, 11.0), > Row(1, "aa", 41.0, 1, 44.0), > Row(1, "aa", 41.0, 1, 45.0), > Row(2, "bb", 10.0, 1, 42.0), > Row(2, "bb", 10.0, 2, 11.0), > Row(2, "bb", 10.5, 1, 42.0), > Row(2, "bb", 10.5, 2, 11.0), > Row(3, "cc", 15.5, 3, 19.5) > ) > ) > } > }{code} > > Note: this tests has setup the datasourceV2 to return multiple splits for > same partition. > In this case, SPJ is not triggered (because join key does not match partition > key), but the following code in DSV2Scan: > [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194] > intended to fill the empty partition for 'pushdown-vallue' will still iterate > through non-grouped partition and lookup from grouped partition to fill the > map, resulting in some duplicate input data fed into the join. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-42134) Fix getPartitionFiltersAndDataFilters() to handle filters without referenced attributes
[ https://issues.apache.org/jira/browse/SPARK-42134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-42134: --- Labels: correctness (was: ) > Fix getPartitionFiltersAndDataFilters() to handle filters without referenced > attributes > --- > > Key: SPARK-42134 > URL: https://issues.apache.org/jira/browse/SPARK-42134 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Peter Toth >Assignee: Peter Toth >Priority: Major > Labels: correctness > Fix For: 3.3.2, 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43760) Incorrect attribute nullability after RewriteCorrelatedScalarSubquery leads to incorrect query results
[ https://issues.apache.org/jira/browse/SPARK-43760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-43760: --- Labels: correctness (was: ) > Incorrect attribute nullability after RewriteCorrelatedScalarSubquery leads > to incorrect query results > -- > > Key: SPARK-43760 > URL: https://issues.apache.org/jira/browse/SPARK-43760 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Andrey Gubichev >Assignee: Andrey Gubichev >Priority: Major > Labels: correctness > Fix For: 3.4.1, 3.5.0 > > > The following query: > > {code:java} > select * from ( > select t1.id c1, ( > select t2.id c from range (1, 2) t2 > where t1.id = t2.id ) c2 > from range (1, 3) t1 ) t > where t.c2 is not null > -- !query schema > struct > -- !query output > 1 1 > 2 NULL > {code} > > should return 1 row, because the second row is supposed to be removed by > IsNotNull predicate. However, due to a wrong nullability propagation after > subquery decorrelation, the output of the subquery is declared as > not-nullable (incorrectly), so the predicate is constant folded into True. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44448) Wrong results for dense_rank() <= k from InferWindowGroupLimit and DenseRankLimitIterator
[ https://issues.apache.org/jira/browse/SPARK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-8: --- Labels: correctness (was: ) > Wrong results for dense_rank() <= k from InferWindowGroupLimit and > DenseRankLimitIterator > - > > Key: SPARK-8 > URL: https://issues.apache.org/jira/browse/SPARK-8 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Jack Chen >Assignee: Jack Chen >Priority: Major > Labels: correctness > Fix For: 3.5.0 > > > Top-k filters on a dense_rank() window function return wrong results, due to > a bug in optimization InferWindowGroupLimit, specifically in the code for > DenseRankLimitIterator, introduced in > https://issues.apache.org/jira/browse/SPARK-37099. > Repro: > {code:java} > create or replace temp view t1 (p, o) as values (1, 1), (1, 1), (1, 2), (2, > 1), (2, 1), (2, 2); > select * from (select *, dense_rank() over (partition by p order by o) as rnk > from t1) where rnk = 1;{code} > Spark result: > {code:java} > [1,1,1] > [1,1,1] > [2,1,1]{code} > Correct result: > {code:java} > [1,1,1] > [1,1,1] > [2,1,1] > [2,1,1]{code} > > The bug is in {{{}DenseRankLimitIterator{}}}, it fails to reset state > properly when transitioning from one window partition to the next. {{reset}} > only resets {{{}rank = 0{}}}, what it is missing is to reset > {{{}currentRankRow = null{}}}. This means that when processing the second and > later window partitions, the rank incorrectly gets incremented based on > comparing the ordering of the last row of the previous partition to the first > row of the new partition. > This means that a dense_rank window func that has more than one window > partition and more than one row with dense_rank = 1 in the second or later > partitions can give wrong results when optimized. > ({{{}RankLimitIterator{}}} narrowly avoids this bug by happenstance, the > first row in the new partition will try to increment rank, but increment it > by the value of count which is 0, so it happens to work by accident). > Unfortunately, tests for the optimization only had a single row per rank, so > did not catch the bug as the bug requires multiple rows per rank. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45920) group by ordinal should be idempotent
[ https://issues.apache.org/jira/browse/SPARK-45920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45920: --- Labels: correctness pull-request-available (was: pull-request-available) > group by ordinal should be idempotent > - > > Key: SPARK-45920 > URL: https://issues.apache.org/jira/browse/SPARK-45920 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.3 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Labels: correctness, pull-request-available > Fix For: 3.4.2, 4.0.0, 3.5.1, 3.3.4 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45507) Correctness bug in correlated scalar subqueries with COUNT aggregates
[ https://issues.apache.org/jira/browse/SPARK-45507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45507: --- Labels: correctness pull-request-available (was: pull-request-available) > Correctness bug in correlated scalar subqueries with COUNT aggregates > - > > Key: SPARK-45507 > URL: https://issues.apache.org/jira/browse/SPARK-45507 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 4.0.0 >Reporter: Andy Lam >Assignee: Andy Lam >Priority: Major > Labels: correctness, pull-request-available > Fix For: 4.0.0 > > > {code:java} > > create view if not exists t1(a1, a2) as values (0, 1), (1, 2); > create view if not exists t2(b1, b2) as values (0, 2), (0, 3); > create view if not exists t3(c1, c2) as values (0, 2), (0, 3); > -- Example 1 > select ( > select SUM(l.cnt + r.cnt) > from (select count(*) cnt from t2 where t1.a1 = t2.b1 having cnt = 0) l > join (select count(*) cnt from t3 where t1.a1 = t3.c1 having cnt = 0) r > on l.cnt = r.cnt > ) from t1 > -- Correct answer: (null, 0) > +--+ > |scalarsubquery(c1, c1)| > +--+ > |null | > |null | > +--+ > -- Example 2 > select ( select sum(cnt) from (select count(*) cnt from t2 where t1.c1 = > t2.c1) ) from t1 > -- Correct answer: (2, 0) > +--+ > |scalarsubquery(c1)| > +--+ > |2 | > |null | > +--+ > -- Example 3 > select ( select count(*) from (select count(*) cnt from t2 where t1.c1 = > t2.c1) ) from t1 > -- Correct answer: (1, 1) > +--+ > |scalarsubquery(c1)| > +--+ > |1 | > |0 | > +--+ {code} > > > DB fiddle for correctness > check:[https://www.db-fiddle.com/f/4jyoMCicNSZpjMt4jFYoz5/10403#] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46092) Overflow in Parquet row group filter creation causes incorrect results
[ https://issues.apache.org/jira/browse/SPARK-46092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-46092: --- Labels: correctness pull-request-available (was: pull-request-available) > Overflow in Parquet row group filter creation causes incorrect results > -- > > Key: SPARK-46092 > URL: https://issues.apache.org/jira/browse/SPARK-46092 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Johan Lasperas >Priority: Major > Labels: correctness, pull-request-available > > While the parquet readers don't support reading parquet values into larger > Spark types, it's possible to trigger an overflow when creating a Parquet row > group filter that will then incorrectly skip row groups and bypass the > exception in the reader, > Repro: > {code:java} > Seq(0).toDF("a").write.parquet(path) > spark.read.schema("a LONG").parquet(path).where(s"a < > ${Long.MaxValue}").collect(){code} > This succeeds and returns no results. This should either fail if the Parquet > reader doesn't support the upcast from int to long or produce result `[0]` if > it does. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45386) Correctness issue when persisting using StorageLevel.NONE
[ https://issues.apache.org/jira/browse/SPARK-45386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45386: --- Labels: correctness pull-request-available (was: pull-request-available) > Correctness issue when persisting using StorageLevel.NONE > - > > Key: SPARK-45386 > URL: https://issues.apache.org/jira/browse/SPARK-45386 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0, 4.0.0 >Reporter: Emil Ejbyfeldt >Assignee: Emil Ejbyfeldt >Priority: Major > Labels: correctness, pull-request-available > Fix For: 3.5.1 > > > When using spark 3.5.0 this code > {code:java} > import org.apache.spark.storage.StorageLevel > spark.createDataset(Seq(1,2,3)).persist(StorageLevel.NONE).count() {code} > incorrectly returns 0. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44871) Fix PERCENTILE_DISC behaviour
[ https://issues.apache.org/jira/browse/SPARK-44871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-44871: --- Labels: correctness (was: ) > Fix PERCENTILE_DISC behaviour > - > > Key: SPARK-44871 > URL: https://issues.apache.org/jira/browse/SPARK-44871 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0, 3.3.1, 3.3.3, 3.3.2, 3.4.0, 3.4.1 >Reporter: Peter Toth >Assignee: Peter Toth >Priority: Critical > Labels: correctness > Fix For: 3.4.2, 3.5.0, 4.0.0, 3.3.4 > > > Currently {{percentile_disc()}} returns incorrect results in some cases: > E.g.: > {code:java} > SELECT > percentile_disc(0.0) WITHIN GROUP (ORDER BY a) as p0, > percentile_disc(0.1) WITHIN GROUP (ORDER BY a) as p1, > percentile_disc(0.2) WITHIN GROUP (ORDER BY a) as p2, > percentile_disc(0.3) WITHIN GROUP (ORDER BY a) as p3, > percentile_disc(0.4) WITHIN GROUP (ORDER BY a) as p4, > percentile_disc(0.5) WITHIN GROUP (ORDER BY a) as p5, > percentile_disc(0.6) WITHIN GROUP (ORDER BY a) as p6, > percentile_disc(0.7) WITHIN GROUP (ORDER BY a) as p7, > percentile_disc(0.8) WITHIN GROUP (ORDER BY a) as p8, > percentile_disc(0.9) WITHIN GROUP (ORDER BY a) as p9, > percentile_disc(1.0) WITHIN GROUP (ORDER BY a) as p10 > FROM VALUES (0), (1), (2), (3), (4) AS v(a) > {code} > returns: > {code:java} > +---+---+---+---+---+---+---+---+---+---+---+ > | p0| p1| p2| p3| p4| p5| p6| p7| p8| p9|p10| > +---+---+---+---+---+---+---+---+---+---+---+ > |0.0|0.0|0.0|1.0|1.0|2.0|2.0|2.0|3.0|3.0|4.0| > +---+---+---+---+---+---+---+---+---+---+---+ > {code} > but it should return: > {noformat} > +---+---+---+---+---+---+---+---+---+---+---+ > | p0| p1| p2| p3| p4| p5| p6| p7| p8| p9|p10| > +---+---+---+---+---+---+---+---+---+---+---+ > |0.0|0.0|0.0|1.0|1.0|2.0|2.0|3.0|3.0|4.0|4.0| > +---+---+---+---+---+---+---+---+---+---+---+ > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43393) Sequence expression can overflow
[ https://issues.apache.org/jira/browse/SPARK-43393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-43393: --- Labels: correctness pull-request-available (was: pull-request-available) > Sequence expression can overflow > > > Key: SPARK-43393 > URL: https://issues.apache.org/jira/browse/SPARK-43393 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Deepayan Patra >Assignee: Deepayan Patra >Priority: Major > Labels: correctness, pull-request-available > Fix For: 3.4.2, 4.0.0, 3.5.1, 3.3.4 > > > Spark has a (long-standing) overflow bug in the {{sequence}} expression. > > Consider the following operations: > {{spark.sql("CREATE TABLE foo (l LONG);")}} > {{spark.sql(s"INSERT INTO foo VALUES (${Long.MaxValue});")}} > {{spark.sql("SELECT sequence(0, l) FROM foo;").collect()}} > > The result of these operations will be: > {{Array[org.apache.spark.sql.Row] = Array([WrappedArray()])}} > an unintended consequence of overflow. > > The sequence is applied to values {{0}} and {{Long.MaxValue}} with a step > size of {{1}} which uses a length computation defined > [here|https://github.com/apache/spark/blob/16411188c7ba6cb19c46a2bd512b2485a4c03e2c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3451]. > In this calculation, with {{{}start = 0{}}}, {{{}stop = Long.MaxValue{}}}, > and {{{}step = 1{}}}, the calculated {{len}} overflows to > {{{}Long.MinValue{}}}. The computation, in binary looks like: > 0111 - > > -- > 0111 / > 0001 > -- > 0111 + > 0001 > -- > 1000 > The following > [check|https://github.com/apache/spark/blob/16411188c7ba6cb19c46a2bd512b2485a4c03e2c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3454] > passes as the negative {{Long.MinValue}} is still {{{}<= > MAX_ROUNDED_ARRAY_LENGTH{}}}. The following cast to {{toInt}} uses this > representation and [truncates the upper > bits|https://github.com/apache/spark/blob/16411188c7ba6cb19c46a2bd512b2485a4c03e2c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3457] > resulting in an empty length of 0. > Other overflows are similarly problematic. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43240) df.describe() method may- return wrong result if the last RDD is RDD[UnsafeRow]
[ https://issues.apache.org/jira/browse/SPARK-43240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-43240: --- Labels: correctness (was: ) > df.describe() method may- return wrong result if the last RDD is > RDD[UnsafeRow] > --- > > Key: SPARK-43240 > URL: https://issues.apache.org/jira/browse/SPARK-43240 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.3.2 >Reporter: Ke Jia >Assignee: Ke Jia >Priority: Major > Labels: correctness > Fix For: 3.3.3 > > > When calling the df.describe() method, the result maybe wrong when the last > RDD is RDD[UnsafeRow]. It is because the UnsafeRow will be released after the > row is used. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43098) Should not handle the COUNT bug when the GROUP BY clause of a correlated scalar subquery is non-empty
[ https://issues.apache.org/jira/browse/SPARK-43098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-43098: --- Labels: correctness (was: ) > Should not handle the COUNT bug when the GROUP BY clause of a correlated > scalar subquery is non-empty > - > > Key: SPARK-43098 > URL: https://issues.apache.org/jira/browse/SPARK-43098 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.2.0 >Reporter: Jack Chen >Assignee: Jack Chen >Priority: Major > Labels: correctness > Fix For: 3.4.1, 3.5.0 > > > From [~allisonwang-db] : > There is no COUNT bug when the correlated equality predicates are also in the > group by clause. However, the current logic to handle the COUNT bug still > adds default aggregate function value and returns incorrect results. > > {code:java} > create view t1(c1, c2) as values (0, 1), (1, 2); > create view t2(c1, c2) as values (0, 2), (0, 3); > select c1, c2, (select count(*) from t2 where t1.c1 = t2.c1 group by c1) from > t1; > -- Correct answer: [(0, 1, 2), (1, 2, null)] > +---+---+--+ > |c1 |c2 |scalarsubquery(c1)| > +---+---+--+ > |0 |1 |2 | > |1 |2 |0 | > +---+---+--+ > {code} > > This bug affects scalar subqueries in RewriteCorrelatedScalarSubquery, but > lateral subqueries handle it correctly in DecorrelateInnerQuery. Related: > https://issues.apache.org/jira/browse/SPARK-36113 > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45568) WholeStageCodegenSparkSubmitSuite flakiness
[ https://issues.apache.org/jira/browse/SPARK-45568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45568: --- Component/s: Tests > WholeStageCodegenSparkSubmitSuite flakiness > --- > > Key: SPARK-45568 > URL: https://issues.apache.org/jira/browse/SPARK-45568 > Project: Spark > Issue Type: Test > Components: Tests, Web UI >Affects Versions: 4.0.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > Labels: pull-request-available > Fix For: 3.4.2, 4.0.0, 3.5.1, 3.3.4 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45751) The default value of ‘spark.executor.logs.rolling.maxRetainedFiles' on the official website is incorrect
[ https://issues.apache.org/jira/browse/SPARK-45751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45751: --- Component/s: Documentation > The default value of ‘spark.executor.logs.rolling.maxRetainedFiles' on the > official website is incorrect > > > Key: SPARK-45751 > URL: https://issues.apache.org/jira/browse/SPARK-45751 > Project: Spark > Issue Type: Improvement > Components: Documentation, Spark Core, UI >Affects Versions: 3.5.0 >Reporter: chenyu >Assignee: chenyu >Priority: Trivial > Labels: pull-request-available > Fix For: 3.4.2, 4.0.0, 3.5.1, 3.3.4 > > Attachments: the default value.png, the value on the website.png > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45791) Rename `SparkConnectSessionHodlerSuite.scala` to `SparkConnectSessionHolderSuite.scala`
[ https://issues.apache.org/jira/browse/SPARK-45791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45791: --- Component/s: Tests > Rename `SparkConnectSessionHodlerSuite.scala` to > `SparkConnectSessionHolderSuite.scala` > --- > > Key: SPARK-45791 > URL: https://issues.apache.org/jira/browse/SPARK-45791 > Project: Spark > Issue Type: Bug > Components: Connect, Tests >Affects Versions: 3.5.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Trivial > Labels: pull-request-available > Fix For: 4.0.0, 3.5.1 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46037) When Left Join build Left, ShuffledHashJoinExec may result in incorrect results
[ https://issues.apache.org/jira/browse/SPARK-46037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-46037: --- Labels: correctness pull-request-available (was: pull-request-available) > When Left Join build Left, ShuffledHashJoinExec may result in incorrect > results > --- > > Key: SPARK-46037 > URL: https://issues.apache.org/jira/browse/SPARK-46037 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: mcdull_zhang >Priority: Minor > Labels: correctness, pull-request-available > > When Left Join build Left and codegen is turned off, ShuffledHashJoinExec may > have incorrect results. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-4836) Web UI should display separate information for all stage attempts
[ https://issues.apache.org/jira/browse/SPARK-4836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reopened SPARK-4836: --- > Web UI should display separate information for all stage attempts > - > > Key: SPARK-4836 > URL: https://issues.apache.org/jira/browse/SPARK-4836 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 1.1.1, 1.2.0 >Reporter: Josh Rosen >Priority: Major > Labels: bulk-closed > > I've run into some cases where the web UI job page will say that a job took > 12 minutes but the sum of that job's stage times is something like 10 > seconds. In this case, it turns out that my job ran a stage to completion > (which took, say, 5 minutes) then lost some partitions of that stage and had > to run a new stage attempt to recompute one or two tasks from that stage. As > a result, the latest attempt for that stage reports only one or two tasks. > In the web UI, it seems that we only show the latest stage attempt, not all > attempts, which can lead to confusing / misleading displays for jobs with > failed / partially-recomputed stages. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45508) Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access cleaner on Java 9+
[ https://issues.apache.org/jira/browse/SPARK-45508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45508: --- Description: We need to add `--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED` to our JVM options so that the code in `org.apache.spark.unsafe.Platform` can access the JDK internal cleaner classes. (was: We need to update the ``` val f = classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") f.setAccessible(true) f.get(null) ``` returning `null` instead of a method.) > Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can > access cleaner on Java 9+ > -- > > Key: SPARK-45508 > URL: https://issues.apache.org/jira/browse/SPARK-45508 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Josh Rosen >Priority: Major > > We need to add `--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED` to our > JVM options so that the code in `org.apache.spark.unsafe.Platform` can access > the JDK internal cleaner classes. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45508) Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access cleaner on Java 9+
[ https://issues.apache.org/jira/browse/SPARK-45508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45508: --- Summary: Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access cleaner on Java 9+ (was: Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access cleaner on Java 11+) > Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can > access cleaner on Java 9+ > -- > > Key: SPARK-45508 > URL: https://issues.apache.org/jira/browse/SPARK-45508 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Josh Rosen >Priority: Major > > We need to update the > > ``` > val f = > classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") > f.setAccessible(true) > f.get(null) > ``` > returning `null` instead of a method. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45508) Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access cleaner on Java 11+
[ https://issues.apache.org/jira/browse/SPARK-45508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45508: --- Summary: Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access cleaner on Java 11+ (was: Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access cleaner on Java 9+) > Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can > access cleaner on Java 11+ > --- > > Key: SPARK-45508 > URL: https://issues.apache.org/jira/browse/SPARK-45508 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Josh Rosen >Priority: Major > > In JDK >= 9.b110, the code at > [https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213] > hits a fallback path because we are using the wrong cleaner class name: > `jdk.internal.ref.Cleaner` was removed in > [https://bugs.openjdk.org/browse/JDK-8149925] > This can be verified via > > ``` > val f = > classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") > f.setAccessible(true) > f.get(null) > ``` > returning `null` instead of a method. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45508) Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access cleaner on Java 11+
[ https://issues.apache.org/jira/browse/SPARK-45508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45508: --- Description: We need to update the ``` val f = classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") f.setAccessible(true) f.get(null) ``` returning `null` instead of a method. was: In JDK >= 9.b110, the code at [https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213] hits a fallback path because we are using the wrong cleaner class name: `jdk.internal.ref.Cleaner` was removed in [https://bugs.openjdk.org/browse/JDK-8149925] This can be verified via ``` val f = classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") f.setAccessible(true) f.get(null) ``` returning `null` instead of a method. > Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can > access cleaner on Java 11+ > --- > > Key: SPARK-45508 > URL: https://issues.apache.org/jira/browse/SPARK-45508 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Josh Rosen >Priority: Major > > We need to update the > > ``` > val f = > classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") > f.setAccessible(true) > f.get(null) > ``` > returning `null` instead of a method. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45508) Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access cleaner on Java 9+
[ https://issues.apache.org/jira/browse/SPARK-45508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45508: --- Summary: Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access cleaner on Java 9+ (was: org.apache.spark.unsafe.Platform uses wrong cleaner class name in JDK 9.b110+) > Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can > access cleaner on Java 9+ > -- > > Key: SPARK-45508 > URL: https://issues.apache.org/jira/browse/SPARK-45508 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Josh Rosen >Priority: Major > > In JDK >= 9.b110, the code at > [https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213] > hits a fallback path because we are using the wrong cleaner class name: > `jdk.internal.ref.Cleaner` was removed in > [https://bugs.openjdk.org/browse/JDK-8149925] > This can be verified via > > ``` > val f = > classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") > f.setAccessible(true) > f.get(null) > ``` > returning `null` instead of a method. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45508) org.apache.spark.unsafe.Platform uses wrong cleaner class name in JDK 9.b110+
[ https://issues.apache.org/jira/browse/SPARK-45508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45508: --- Description: In JDK >= 9.b110, the code at [https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213] hits a fallback path because we are using the wrong cleaner class name: `jdk.internal.ref.Cleaner` was removed in [https://bugs.openjdk.org/browse/JDK-8149925] This can be verified via ``` val f = classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") f.setAccessible(true) f.get(null) ``` returning `null` instead of a method. was: In JDK >= 9.b110, the code at [https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213] hits a fallback path because we are using the wrong cleaner class name: `jdk.internal.ref.Cleaner` was removed in JDK-8149925 [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8149925] This can be verified via ``` val f = classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") f.setAccessible(true) f.get(null) ``` returning `null` instead of a method. > org.apache.spark.unsafe.Platform uses wrong cleaner class name in JDK 9.b110+ > - > > Key: SPARK-45508 > URL: https://issues.apache.org/jira/browse/SPARK-45508 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Josh Rosen >Priority: Major > > In JDK >= 9.b110, the code at > [https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213] > hits a fallback path because we are using the wrong cleaner class name: > `jdk.internal.ref.Cleaner` was removed in > [https://bugs.openjdk.org/browse/JDK-8149925] > This can be verified via > > ``` > val f = > classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") > f.setAccessible(true) > f.get(null) > ``` > returning `null` instead of a method. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45508) org.apache.spark.unsafe.Platform uses wrong cleaner class name in JDK 9.b110+
[ https://issues.apache.org/jira/browse/SPARK-45508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-45508: --- Description: In JDK >= 9.b110, the code at [https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213] hits a fallback path because we are using the wrong cleaner class name: `jdk.internal.ref.Cleaner` was removed in JDK-8149925 [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8149925] This can be verified via ``` val f = classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") f.setAccessible(true) f.get(null) ``` returning `null` instead of a method. was: In JDK 11+, the code at [https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213] hits a fallback path because we are using the wrong cleaner class name. This can be verified via ``` val f = classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") f.setAccessible(true) f.get(null) ``` returning `null` instead of a method. Summary: org.apache.spark.unsafe.Platform uses wrong cleaner class name in JDK 9.b110+ (was: org.apache.spark.unsafe.Platform uses wrong cleaner class name in JDK 11+) > org.apache.spark.unsafe.Platform uses wrong cleaner class name in JDK 9.b110+ > - > > Key: SPARK-45508 > URL: https://issues.apache.org/jira/browse/SPARK-45508 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Josh Rosen >Priority: Major > > In JDK >= 9.b110, the code at > [https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213] > hits a fallback path because we are using the wrong cleaner class name: > `jdk.internal.ref.Cleaner` was removed in JDK-8149925 > [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8149925] > This can be verified via > > ``` > val f = > classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") > f.setAccessible(true) > f.get(null) > ``` > returning `null` instead of a method. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45508) org.apache.spark.unsafe.Platform uses wrong cleaner class name in JDK 11+
Josh Rosen created SPARK-45508: -- Summary: org.apache.spark.unsafe.Platform uses wrong cleaner class name in JDK 11+ Key: SPARK-45508 URL: https://issues.apache.org/jira/browse/SPARK-45508 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.5.0 Reporter: Josh Rosen In JDK 11+, the code at [https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213] hits a fallback path because we are using the wrong cleaner class name. This can be verified via ``` val f = classOf[org.apache.spark.unsafe.Platform].getDeclaredField("CLEANER_CREATE_METHOD") f.setAccessible(true) f.get(null) ``` returning `null` instead of a method. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-42205) Remove logging of Accumulables in Task/Stage start events in JsonProtocol
[ https://issues.apache.org/jira/browse/SPARK-42205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-42205. Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 39767 [https://github.com/apache/spark/pull/39767] > Remove logging of Accumulables in Task/Stage start events in JsonProtocol > - > > Key: SPARK-42205 > URL: https://issues.apache.org/jira/browse/SPARK-42205 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > > Spark's JsonProtocol event logs (used by the history server) are impacted by > a race condition when tasks / stages finish very quickly: > The SparkListenerTaskStart and SparkListenerStageSubmitted events contain > mutable TaskInfo and StageInfo objects, which in turn contain Accumulables > fields. When a task or stage is submitted, Accumulables is initially empty. > When the task or stage finishes, this field is updated with values from the > task. > If a task or stage finishes before the start event has been logged by the > event logging listener then the _start_ event will contain the Accumulable > values from the task or stage _end_ event. > This information isn't used by the History Server and contributes to wasteful > bloat in event log sizes. In one real-world log, I found that ~10% of the > uncompressed log size was due to these redundant Accumulable fields. > I propose that we update JsonProtocol to skip the logging of this field for > Start/Submitted events. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-44920) Use await() instead of awaitUninterruptibly() in TransportClientFactory.createClient()
[ https://issues.apache.org/jira/browse/SPARK-44920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reassigned SPARK-44920: -- Assignee: Josh Rosen > Use await() instead of awaitUninterruptibly() in > TransportClientFactory.createClient() > --- > > Key: SPARK-44920 > URL: https://issues.apache.org/jira/browse/SPARK-44920 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.3, 3.4.2, 3.5.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > > This is a follow up for SPARK-44241: > That call added an `awaitUninterruptibly()` call, which I think should be a > plain `await()` instead. This will prevent issues when cancelling tasks with > hanging network connections. > This issue is similar to SPARK-19529 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44920) Use await() instead of awaitUninterruptibly() in TransportClientFactory.createClient()
Josh Rosen created SPARK-44920: -- Summary: Use await() instead of awaitUninterruptibly() in TransportClientFactory.createClient() Key: SPARK-44920 URL: https://issues.apache.org/jira/browse/SPARK-44920 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.3.3, 3.4.2, 3.5.0 Reporter: Josh Rosen This is a follow up for SPARK-44241: That call added an `awaitUninterruptibly()` call, which I think should be a plain `await()` instead. This will prevent issues when cancelling tasks with hanging network connections. This issue is similar to SPARK-19529 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44818) Fix race for pending interrupt issued before taskThread is initialized
[ https://issues.apache.org/jira/browse/SPARK-44818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-44818. Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 42504 [https://github.com/apache/spark/pull/42504] > Fix race for pending interrupt issued before taskThread is initialized > -- > > Key: SPARK-44818 > URL: https://issues.apache.org/jira/browse/SPARK-44818 > Project: Spark > Issue Type: Task > Components: Spark Core, Structured Streaming >Affects Versions: 3.5.1 >Reporter: Anish Shrigondekar >Assignee: Anish Shrigondekar >Priority: Major > Fix For: 4.0.0 > > > Fix race for pending interrupt issued before taskThread is initialized -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-44818) Fix race for pending interrupt issued before taskThread is initialized
[ https://issues.apache.org/jira/browse/SPARK-44818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reassigned SPARK-44818: -- Assignee: Anish Shrigondekar > Fix race for pending interrupt issued before taskThread is initialized > -- > > Key: SPARK-44818 > URL: https://issues.apache.org/jira/browse/SPARK-44818 > Project: Spark > Issue Type: Task > Components: Spark Core, Structured Streaming >Affects Versions: 3.5.1 >Reporter: Anish Shrigondekar >Assignee: Anish Shrigondekar >Priority: Major > > Fix race for pending interrupt issued before taskThread is initialized -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43300) Cascade failure in Guava cache due to fate-sharing
[ https://issues.apache.org/jira/browse/SPARK-43300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-43300. Fix Version/s: 3.5.0 Resolution: Fixed > Cascade failure in Guava cache due to fate-sharing > -- > > Key: SPARK-43300 > URL: https://issues.apache.org/jira/browse/SPARK-43300 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Assignee: Ziqi Liu >Priority: Major > Fix For: 3.5.0 > > > Guava cache is widely used in spark, however, it suffers from fate-sharing > behavior: If there are multiple requests trying to access the same key in the > {{cache}} at the same time when the key is not in the cache, Guava cache will > block all requests and create the object only once. If the creation fails, > all requests will fail immediately without retry. So we might see task > failure due to irrelevant failure in other queries due to fate sharing. > This fate sharing behavior might lead to unexpected results in some situation. > We can wrap around Guava cache with a KeyLock to synchronize all requests > with the same key, so they will run individually and fail as if they come one > at a time. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43300) Cascade failure in Guava cache due to fate-sharing
[ https://issues.apache.org/jira/browse/SPARK-43300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722959#comment-17722959 ] Josh Rosen commented on SPARK-43300: Fixed in https://github.com/apache/spark/pull/40982 > Cascade failure in Guava cache due to fate-sharing > -- > > Key: SPARK-43300 > URL: https://issues.apache.org/jira/browse/SPARK-43300 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Priority: Major > > Guava cache is widely used in spark, however, it suffers from fate-sharing > behavior: If there are multiple requests trying to access the same key in the > {{cache}} at the same time when the key is not in the cache, Guava cache will > block all requests and create the object only once. If the creation fails, > all requests will fail immediately without retry. So we might see task > failure due to irrelevant failure in other queries due to fate sharing. > This fate sharing behavior might lead to unexpected results in some situation. > We can wrap around Guava cache with a KeyLock to synchronize all requests > with the same key, so they will run individually and fail as if they come one > at a time. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-43300) Cascade failure in Guava cache due to fate-sharing
[ https://issues.apache.org/jira/browse/SPARK-43300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reassigned SPARK-43300: -- Assignee: Ziqi Liu > Cascade failure in Guava cache due to fate-sharing > -- > > Key: SPARK-43300 > URL: https://issues.apache.org/jira/browse/SPARK-43300 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Assignee: Ziqi Liu >Priority: Major > > Guava cache is widely used in spark, however, it suffers from fate-sharing > behavior: If there are multiple requests trying to access the same key in the > {{cache}} at the same time when the key is not in the cache, Guava cache will > block all requests and create the object only once. If the creation fails, > all requests will fail immediately without retry. So we might see task > failure due to irrelevant failure in other queries due to fate sharing. > This fate sharing behavior might lead to unexpected results in some situation. > We can wrap around Guava cache with a KeyLock to synchronize all requests > with the same key, so they will run individually and fail as if they come one > at a time. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43414) Fix flakiness in Kafka RDD suites due to port binding configuration issue
Josh Rosen created SPARK-43414: -- Summary: Fix flakiness in Kafka RDD suites due to port binding configuration issue Key: SPARK-43414 URL: https://issues.apache.org/jira/browse/SPARK-43414 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.4.0 Reporter: Josh Rosen Assignee: Josh Rosen In SPARK-36837 we updated Kafka to 3.10, which uses a different set of configuration options for configuring the broker listener port. That PR only updated one of two KafkaTestUtils files (the SQL one), so the other one (used by Core tests) had an ineffective port binding configuration and would bind to the default 9092 port. This could lead to flakiness if multiple suites binding to that port ran in parallel. To fix this, we just need to copy the updated port binding configuration from the other suite. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-42754) Spark 3.4 history server's SQL tab incorrectly groups SQL executions when replaying event logs from Spark 3.3 and earlier
[ https://issues.apache.org/jira/browse/SPARK-42754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-42754: --- Description: In Spark 3.4.0 RC4, the Spark History Server's SQL tab incorrectly groups SQL executions when replaying event logs generated by older Spark versions. {*}Reproduction{*}: {{In ./bin/spark-shell --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=eventlogs, run three non-nested SQL queries:}} {code:java} sql("select * from range(10)").collect() sql("select * from range(20)").collect() sql("select * from range(30)").collect(){code} Exit the shell and use the Spark History Server to replay this application's UI. In the SQL tab I expect to see three separate queries, but Spark 3.4's history server incorrectly groups the second and third queries as nested queries of the first (see attached screenshot). {*}Root cause{*}: [https://github.com/apache/spark/pull/39268] / SPARK-41752 added a new *non-optional* {{rootExecutionId: Long}} field to the SparkListenerSQLExecutionStart case class. When JsonProtocol deserializes this event it uses the "ignore missing properties" Jackson deserialization option, causing the {{rootExecutionField}} to be initialized with a default value of {{{}0{}}}. The value {{0}} is a legitimate execution ID, so in the deserialized event we have no ability to distinguish between the absence of a value and a case where all queries have the first query as the root. *Proposed* {*}fix{*}: I think we should change this field to be of type {{Option[Long]}} . I believe this is a release blocker for Spark 3.4.0 because we cannot change the type of this new field in a future release without breaking binary compatibility. was: In Spark 3.4.0 RC4, the Spark History Server's SQL tab incorrectly groups SQL executions when replaying event logs generated by older Spark versions. {*}Reproduction{*}: {{In ./bin/spark-shell --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=eventlogs, run three non-nested SQL queries:}} {code:java} sql("select * from range(10)").collect() sql("select * from range(20)").collect() sql("select * from range(30)").collect(){code} Exit the shell and use the Spark History Server to replay this UI. In the SQL tab I expect to see three separate queries, but Spark 3.4's history server incorrectly groups the second and third queries as nested queries of the first (see attached screenshot). {*}Root cause{*}: [https://github.com/apache/spark/pull/39268] / SPARK-41752 added a new *non-optional* {{rootExecutionId: Long}} field to the SparkListenerSQLExecutionStart case class. When JsonProtocol deserializes this event it uses the "ignore missing properties" Jackson deserialization option, causing the {{rootExecutionField}} to be initialized with a default value of {{{}0{}}}. The value {{0}} is a legitimate execution ID, so in the deserialized event we have no ability to distinguish between the absence of a value and a case where all queries have the first query as the root. *Proposed* {*}fix{*}: I think we should change this field to be of type {{Option[Long]}} . I believe this is a release blocker for Spark 3.4.0 because we cannot change the type of this new field in a future release without breaking binary compatibility. > Spark 3.4 history server's SQL tab incorrectly groups SQL executions when > replaying event logs from Spark 3.3 and earlier > - > > Key: SPARK-42754 > URL: https://issues.apache.org/jira/browse/SPARK-42754 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: Josh Rosen >Priority: Blocker > Attachments: example.png > > > In Spark 3.4.0 RC4, the Spark History Server's SQL tab incorrectly groups SQL > executions when replaying event logs generated by older Spark versions. > > {*}Reproduction{*}: > {{In ./bin/spark-shell --conf spark.eventLog.enabled=true --conf > spark.eventLog.dir=eventlogs, run three non-nested SQL queries:}} > {code:java} > sql("select * from range(10)").collect() > sql("select * from range(20)").collect() > sql("select * from range(30)").collect(){code} > Exit the shell and use the Spark History Server to replay this application's > UI. > In the SQL tab I expect to see three separate queries, but Spark 3.4's > history server incorrectly groups the second and third queries as nested > queries of the first (see attached screenshot). > > {*}Root cause{*}: > [https://github.com/apache/spark/pull/39268] / SPARK-41752 added a new > *non-optional* {{rootExecutionId: Long}} field to the > SparkListenerSQLExecutionStart case class. > When JsonProtocol deserializes this event it uses the "ignore missing >
[jira] [Created] (SPARK-42754) Spark 3.4 history server's SQL tab incorrectly groups SQL executions when replaying event logs from Spark 3.3 and earlier
Josh Rosen created SPARK-42754: -- Summary: Spark 3.4 history server's SQL tab incorrectly groups SQL executions when replaying event logs from Spark 3.3 and earlier Key: SPARK-42754 URL: https://issues.apache.org/jira/browse/SPARK-42754 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: Josh Rosen Attachments: example.png In Spark 3.4.0 RC4, the Spark History Server's SQL tab incorrectly groups SQL executions when replaying event logs generated by older Spark versions. {*}Reproduction{*}: {{In ./bin/spark-shell --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=eventlogs, run three non-nested SQL queries:}} {code:java} sql("select * from range(10)").collect() sql("select * from range(20)").collect() sql("select * from range(30)").collect(){code} Exit the shell and use the Spark History Server to replay this UI. In the SQL tab I expect to see three separate queries, but Spark 3.4's history server incorrectly groups the second and third queries as nested queries of the first (see attached screenshot). {*}Root cause{*}: [https://github.com/apache/spark/pull/39268] / SPARK-41752 added a new *non-optional* {{rootExecutionId: Long}} field to the SparkListenerSQLExecutionStart case class. When JsonProtocol deserializes this event it uses the "ignore missing properties" Jackson deserialization option, causing the {{rootExecutionField}} to be initialized with a default value of {{{}0{}}}. The value {{0}} is a legitimate execution ID, so in the deserialized event we have no ability to distinguish between the absence of a value and a case where all queries have the first query as the root. *Proposed* {*}fix{*}: I think we should change this field to be of type {{Option[Long]}} . I believe this is a release blocker for Spark 3.4.0 because we cannot change the type of this new field in a future release without breaking binary compatibility. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-42754) Spark 3.4 history server's SQL tab incorrectly groups SQL executions when replaying event logs from Spark 3.3 and earlier
[ https://issues.apache.org/jira/browse/SPARK-42754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-42754: --- Attachment: example.png > Spark 3.4 history server's SQL tab incorrectly groups SQL executions when > replaying event logs from Spark 3.3 and earlier > - > > Key: SPARK-42754 > URL: https://issues.apache.org/jira/browse/SPARK-42754 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: Josh Rosen >Priority: Blocker > Attachments: example.png > > > In Spark 3.4.0 RC4, the Spark History Server's SQL tab incorrectly groups SQL > executions when replaying event logs generated by older Spark versions. > > {*}Reproduction{*}: > {{In ./bin/spark-shell --conf spark.eventLog.enabled=true --conf > spark.eventLog.dir=eventlogs, run three non-nested SQL queries:}} > {code:java} > sql("select * from range(10)").collect() > sql("select * from range(20)").collect() > sql("select * from range(30)").collect(){code} > Exit the shell and use the Spark History Server to replay this UI. > In the SQL tab I expect to see three separate queries, but Spark 3.4's > history server incorrectly groups the second and third queries as nested > queries of the first (see attached screenshot). > > {*}Root cause{*}: > [https://github.com/apache/spark/pull/39268] / SPARK-41752 added a new > *non-optional* {{rootExecutionId: Long}} field to the > SparkListenerSQLExecutionStart case class. > When JsonProtocol deserializes this event it uses the "ignore missing > properties" Jackson deserialization option, causing the > {{rootExecutionField}} to be initialized with a default value of {{{}0{}}}. > The value {{0}} is a legitimate execution ID, so in the deserialized event we > have no ability to distinguish between the absence of a value and a case > where all queries have the first query as the root. > *Proposed* {*}fix{*}: > I think we should change this field to be of type {{Option[Long]}} . I > believe this is a release blocker for Spark 3.4.0 because we cannot change > the type of this new field in a future release without breaking binary > compatibility. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-42206) Omit "Task Executor Metrics" field in JsonProtocol output if values are all zero
[ https://issues.apache.org/jira/browse/SPARK-42206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reassigned SPARK-42206: -- Assignee: Josh Rosen > Omit "Task Executor Metrics" field in JsonProtocol output if values are all > zero > > > Key: SPARK-42206 > URL: https://issues.apache.org/jira/browse/SPARK-42206 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > > SPARK-26329 added "Task Executor Metrics" to JsonProtocol > SparkListenerTaskEnd JSON. With the default > `spark.executor.metrics.pollingInterval = 0` configuration these metric > values are only updated when heartbeats occur. If a task launches and > finishes between executor heartbeats then all of these metric values will be > zero. For jobs with large numbers of short tasks, this contributes to > significant event log bloat. > JsonProtocol already knows how to handle the absence of the "Task Executor > Metrics" field, so I think it's safe for us to omit this field when all > values are zero. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-42205) Remove logging of Accumulables in Task/Stage start events in JsonProtocol
[ https://issues.apache.org/jira/browse/SPARK-42205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reassigned SPARK-42205: -- Assignee: Josh Rosen > Remove logging of Accumulables in Task/Stage start events in JsonProtocol > - > > Key: SPARK-42205 > URL: https://issues.apache.org/jira/browse/SPARK-42205 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > > Spark's JsonProtocol event logs (used by the history server) are impacted by > a race condition when tasks / stages finish very quickly: > The SparkListenerTaskStart and SparkListenerStageSubmitted events contain > mutable TaskInfo and StageInfo objects, which in turn contain Accumulables > fields. When a task or stage is submitted, Accumulables is initially empty. > When the task or stage finishes, this field is updated with values from the > task. > If a task or stage finishes before the start event has been logged by the > event logging listener then the _start_ event will contain the Accumulable > values from the task or stage _end_ event. > This information isn't used by the History Server and contributes to wasteful > bloat in event log sizes. In one real-world log, I found that ~10% of the > uncompressed log size was due to these redundant Accumulable fields. > I propose that we update JsonProtocol to skip the logging of this field for > Start/Submitted events. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-42204) Remove redundant logging of TaskMetrics internal accumulators in JsonProtocol event logs
[ https://issues.apache.org/jira/browse/SPARK-42204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reassigned SPARK-42204: -- Assignee: Josh Rosen > Remove redundant logging of TaskMetrics internal accumulators in JsonProtocol > event logs > > > Key: SPARK-42204 > URL: https://issues.apache.org/jira/browse/SPARK-42204 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > > Spark's JsonProtocol event logs (used by the history server) contain > redundancy in how TaskMetrics are represented in SparkListenerTaskEnd events: > * The "Task Metrics" field is a map from metric names to values. > * Under the hood, each metric is implemented using an accumulator and those > accumulator values are redundantly stored in the `Task Info`.`Accumulables` > field. These Accumulable entries contain the metric value from the task, plus > the cumulative "sum-so-far" from the completed tasks in that stage. > The Spark History Server doesn't rely on the redundant information in the > Accumulables field. > I believe that this redundancy was introduced back in SPARK-10620 when Spark > 1.x's separate TaskMetrics implementation was replaced by the current > accumulator-based version. > I think that we should eliminate this redundancy by skipping JsonProtocol > logging of the TaskMetric accumulators. Although I think it's somewhat > unlikely that third-party code is relying on the presence of that redundant > information, I think we should hedge by adding an internal configuration flag > to re-enable the redundant logging if needed. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42206) Omit "Task Executor Metrics" field in JsonProtocol output if values are all zero
Josh Rosen created SPARK-42206: -- Summary: Omit "Task Executor Metrics" field in JsonProtocol output if values are all zero Key: SPARK-42206 URL: https://issues.apache.org/jira/browse/SPARK-42206 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Josh Rosen SPARK-26329 added "Task Executor Metrics" to JsonProtocol SparkListenerTaskEnd JSON. With the default `spark.executor.metrics.pollingInterval = 0` configuration these metric values are only updated when heartbeats occur. If a task launches and finishes between executor heartbeats then all of these metric values will be zero. For jobs with large numbers of short tasks, this contributes to significant event log bloat. JsonProtocol already knows how to handle the absence of the "Task Executor Metrics" field, so I think it's safe for us to omit this field when all values are zero. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42205) Remove logging of Accumulables in Task/Stage start events in JsonProtocol
Josh Rosen created SPARK-42205: -- Summary: Remove logging of Accumulables in Task/Stage start events in JsonProtocol Key: SPARK-42205 URL: https://issues.apache.org/jira/browse/SPARK-42205 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Josh Rosen Spark's JsonProtocol event logs (used by the history server) are impacted by a race condition when tasks / stages finish very quickly: The SparkListenerTaskStart and SparkListenerStageSubmitted events contain mutable TaskInfo and StageInfo objects, which in turn contain Accumulables fields. When a task or stage is submitted, Accumulables is initially empty. When the task or stage finishes, this field is updated with values from the task. If a task or stage finishes before the start event has been logged by the event logging listener then the _start_ event will contain the Accumulable values from the task or stage _end_ event. This information isn't used by the History Server and contributes to wasteful bloat in event log sizes. In one real-world log, I found that ~10% of the uncompressed log size was due to these redundant Accumulable fields. I propose that we update JsonProtocol to skip the logging of this field for Start/Submitted events. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42204) Remove redundant logging of TaskMetrics internal accumulators in JsonProtocol event logs
Josh Rosen created SPARK-42204: -- Summary: Remove redundant logging of TaskMetrics internal accumulators in JsonProtocol event logs Key: SPARK-42204 URL: https://issues.apache.org/jira/browse/SPARK-42204 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Josh Rosen Spark's JsonProtocol event logs (used by the history server) contain redundancy in how TaskMetrics are represented in SparkListenerTaskEnd events: * The "Task Metrics" field is a map from metric names to values. * Under the hood, each metric is implemented using an accumulator and those accumulator values are redundantly stored in the `Task Info`.`Accumulables` field. These Accumulable entries contain the metric value from the task, plus the cumulative "sum-so-far" from the completed tasks in that stage. The Spark History Server doesn't rely on the redundant information in the Accumulables field. I believe that this redundancy was introduced back in SPARK-10620 when Spark 1.x's separate TaskMetrics implementation was replaced by the current accumulator-based version. I think that we should eliminate this redundancy by skipping JsonProtocol logging of the TaskMetric accumulators. Although I think it's somewhat unlikely that third-party code is relying on the presence of that redundant information, I think we should hedge by adding an internal configuration flag to re-enable the redundant logging if needed. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42203) JsonProtocol should skip logging of push-based shuffle read metrics when push-based shuffle is disabled
Josh Rosen created SPARK-42203: -- Summary: JsonProtocol should skip logging of push-based shuffle read metrics when push-based shuffle is disabled Key: SPARK-42203 URL: https://issues.apache.org/jira/browse/SPARK-42203 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.4.0 Reporter: Josh Rosen This is a followup to SPARK-36620: When push-based shuffle is disabled (the default), I think that we should skip the logging of the new push-based shuffle read metrics. Because these metrics are logged for every task, they will add significant additional size to Spark event logs. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-42203) JsonProtocol should skip logging of push-based shuffle read metrics when push-based shuffle is disabled
[ https://issues.apache.org/jira/browse/SPARK-42203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-42203: --- Description: This is a followup to SPARK-36620: When push-based shuffle is disabled (the default), I think that we should skip the logging of the new push-based shuffle read metrics. Because these metrics are logged for every task, they will add significant additional size to Spark event logs. It would be great to avoid this cost in cases where it's not necessary. was: This is a followup to SPARK-36620: When push-based shuffle is disabled (the default), I think that we should skip the logging of the new push-based shuffle read metrics. Because these metrics are logged for every task, they will add significant additional size to Spark event logs. > JsonProtocol should skip logging of push-based shuffle read metrics when > push-based shuffle is disabled > --- > > Key: SPARK-42203 > URL: https://issues.apache.org/jira/browse/SPARK-42203 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.4.0 >Reporter: Josh Rosen >Priority: Major > > This is a followup to SPARK-36620: > When push-based shuffle is disabled (the default), I think that we should > skip the logging of the new push-based shuffle read metrics. Because these > metrics are logged for every task, they will add significant additional size > to Spark event logs. It would be great to avoid this cost in cases where it's > not necessary. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41541) Fix wrong child call in SQLShuffleWriteMetricsReporter.decRecordsWritten()
Josh Rosen created SPARK-41541: -- Summary: Fix wrong child call in SQLShuffleWriteMetricsReporter.decRecordsWritten() Key: SPARK-41541 URL: https://issues.apache.org/jira/browse/SPARK-41541 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 3.0.0 Reporter: Josh Rosen Assignee: Josh Rosen In the {{SQLShuffleWriteMetricsReporter.decRecordsWritten}} method, a call to a child accidentally decrements _bytesWritten_ instead of {_}recordsWritten{_}: {code:java} override def decRecordsWritten(v: Long): Unit = { metricsReporter.decBytesWritten(v) _recordsWritten.set(_recordsWritten.value - v) } {code} One of the situations where {{decRecordsWritten}} is called while reverting shuffle writes from failed/canceled tasks. Due to the mixup in these calls, the _recordsWritten_ metric ends up being _v_ records too high (since it wasn't decremented) and the _bytesWritten_ metric ends up _v_ records too low, causing some failed tasks' write metrics to look like {code:java} {"Shuffle Bytes Written":-2109,"Shuffle Write Time":2923270,"Shuffle Records Written":2109} {code} instead of {code:java} {"Shuffle Bytes Written":0,"Shuffle Write Time":2923270,"Shuffle Records Written":0} {code} I'll submit a fix for this. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38542) UnsafeHashedRelation should serialize numKeys out
[ https://issues.apache.org/jira/browse/SPARK-38542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-38542: --- Labels: correctness (was: ) > UnsafeHashedRelation should serialize numKeys out > - > > Key: SPARK-38542 > URL: https://issues.apache.org/jira/browse/SPARK-38542 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: mcdull_zhang >Priority: Critical > Labels: correctness > Fix For: 3.3.0, 3.2.2 > > > At present, UnsafeHashedRelation does not write out numKeys during > serialization, so the numKeys of UnsafeHashedRelation obtained by > deserialization is equal to 0. The numFields of UnsafeRows returned by > UnsafeHashedRelation.keys() are all 0, which can lead to missing or incorrect > data. > > For example, in SubqueryBroadcastExec, the HashedRelation.keys() function is > called. > {code:java} > val broadcastRelation = child.executeBroadcast[HashedRelation]().value > val (iter, expr) = if (broadcastRelation.isInstanceOf[LongHashedRelation]) { > (broadcastRelation.keys(), HashJoin.extractKeyExprAt(buildKeys, index)) > } else { > (broadcastRelation.keys(), > BoundReference(index, buildKeys(index).dataType, > buildKeys(index).nullable)) > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-40261) DirectTaskResult meta should not be counted into result size
[ https://issues.apache.org/jira/browse/SPARK-40261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reassigned SPARK-40261: -- Assignee: Ziqi Liu > DirectTaskResult meta should not be counted into result size > > > Key: SPARK-40261 > URL: https://issues.apache.org/jira/browse/SPARK-40261 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Assignee: Ziqi Liu >Priority: Major > > This issue exists for a long time (since > [https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)] > when calculating whether driver fetching result exceed > `spark.driver.maxResultSize` limit, the whole serialized result task size is > taken into account, including task metadata overhead > size([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41]) > as well. However, the metadata should not be counted because they will be > discarded by the driver immediately after being processed. > This will lead to exception when running jobs with tons of task but actually > return small results. > Therefore we should only count > `[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]` > when calculating result size limit. > cc [~joshrosen] > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-40261) DirectTaskResult meta should not be counted into result size
[ https://issues.apache.org/jira/browse/SPARK-40261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-40261. Fix Version/s: 3.4.0 Resolution: Fixed Fixed by https://github.com/apache/spark/pull/37713 > DirectTaskResult meta should not be counted into result size > > > Key: SPARK-40261 > URL: https://issues.apache.org/jira/browse/SPARK-40261 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Assignee: Ziqi Liu >Priority: Major > Fix For: 3.4.0 > > > This issue exists for a long time (since > [https://github.com/liuzqt/spark/commit/c33e55008239f417764d589c1366371d18331686)] > when calculating whether driver fetching result exceed > `spark.driver.maxResultSize` limit, the whole serialized result task size is > taken into account, including task metadata overhead > size([accumUpdates|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L41]) > as well. However, the metadata should not be counted because they will be > discarded by the driver immediately after being processed. > This will lead to exception when running jobs with tons of task but actually > return small results. > Therefore we should only count > `[valueBytes|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala#L40]` > when calculating result size limit. > cc [~joshrosen] > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-40235) Use interruptible lock instead of synchronized in Executor.updateDependencies()
[ https://issues.apache.org/jira/browse/SPARK-40235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-40235. Fix Version/s: 3.4.0 Resolution: Fixed Fixed by [https://github.com/apache/spark/pull/37681] > Use interruptible lock instead of synchronized in > Executor.updateDependencies() > --- > > Key: SPARK-40235 > URL: https://issues.apache.org/jira/browse/SPARK-40235 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > Fix For: 3.4.0 > > > This patch modifies the synchronization in {{Executor.updateDependencies()}} > in order to allow tasks to be interrupted while they are blocked and waiting > on other tasks to finish downloading dependencies. > This synchronization was added years ago in > [mesos/spark@{{{}7b9e96c{}}}|https://github.com/mesos/spark/commit/7b9e96c99206c0679d9925e0161fde738a5c7c3a] > in order to prevent concurrently-launching tasks from performing concurrent > dependency updates. If one task is downloading dependencies, all other > newly-launched tasks will block until the original dependency download is > complete. > Let's say that a Spark task launches, becomes blocked on a > {{updateDependencies()}} call, then is cancelled while it is blocked. > Although Spark will send a Thread.interrupt() to the canceled task, the task > will continue waiting because threads blocked on a {{synchronized}} won't > throw an InterruptedException in response to the interrupt. As a result, the > blocked thread will continue to wait until the other thread exits the > synchronized block. > In the wild, we saw a case where this happened and the thread remained > blocked for over 1 minute, causing the TaskReaper to kick in and > self-destruct the executor. > This PR aims to fix this problem by replacing the {{synchronized}} with a > ReentrantLock, which has a {{lockInterruptibly}} method. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40263) Use interruptible lock instead of synchronized in TransportClientFactory.createClient()
Josh Rosen created SPARK-40263: -- Summary: Use interruptible lock instead of synchronized in TransportClientFactory.createClient() Key: SPARK-40263 URL: https://issues.apache.org/jira/browse/SPARK-40263 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.4.0 Reporter: Josh Rosen Followup to SPARK-40235: we should apply a similar fix in TransportClientFactory.createClient -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-40211) Allow executeTake() / collectLimit's number of starting partitions to be customized
[ https://issues.apache.org/jira/browse/SPARK-40211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-40211. Fix Version/s: 3.4.0 Assignee: Ziqi Liu Resolution: Fixed Resolved by [https://github.com/apache/spark/pull/37661] > Allow executeTake() / collectLimit's number of starting partitions to be > customized > --- > > Key: SPARK-40211 > URL: https://issues.apache.org/jira/browse/SPARK-40211 > Project: Spark > Issue Type: Story > Components: Spark Core, SQL >Affects Versions: 3.4.0 >Reporter: Ziqi Liu >Assignee: Ziqi Liu >Priority: Major > Fix For: 3.4.0 > > > Today, Spark’s executeTake() code allow for the limitScaleUpFactor to be > customized but does not allow for the initial number of partitions to be > customized: it’s currently hardcoded to {{{}1{}}}. > We should add a configuration so that the initial partition count can be > customized. By setting this new configuration to a high value we could > effectively mitigate the “run multiple jobs” overhead in {{take}} behavior. > We could also set it to higher-than-1-but-still-small values (like, say, > {{{}10{}}}) to achieve a middle-ground trade-off. > > Essentially, we need to make {{numPartsToTry = 1L}} > ([code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L481]) > customizable. We should do this via a new SQL conf, similar to the > {{limitScaleUpFactor}} conf. > > Spark has several near-duplicate versions of this code ([see code > search|https://github.com/apache/spark/search?q=numPartsToTry+%3D+1]) in: > * SparkPlan > * RDD > * pyspark rdd > Also, in pyspark {{limitScaleUpFactor}} is not supported either. So for > now, I will focus on scala side first, leaving python side untouched and > meanwhile sync with pyspark members. Depending on the progress we can do them > all in one PR or make scala side change first and leave pyspark change as a > follow-up. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40235) Use interruptible lock instead of synchronized in Executor.updateDependencies()
[ https://issues.apache.org/jira/browse/SPARK-40235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-40235: --- Description: This patch modifies the synchronization in {{Executor.updateDependencies()}} in order to allow tasks to be interrupted while they are blocked and waiting on other tasks to finish downloading dependencies. This synchronization was added years ago in [mesos/spark@{{{}7b9e96c{}}}|https://github.com/mesos/spark/commit/7b9e96c99206c0679d9925e0161fde738a5c7c3a] in order to prevent concurrently-launching tasks from performing concurrent dependency updates. If one task is downloading dependencies, all other newly-launched tasks will block until the original dependency download is complete. Let's say that a Spark task launches, becomes blocked on a {{updateDependencies()}} call, then is cancelled while it is blocked. Although Spark will send a Thread.interrupt() to the canceled task, the task will continue waiting because threads blocked on a {{synchronized}} won't throw an InterruptedException in response to the interrupt. As a result, the blocked thread will continue to wait until the other thread exits the synchronized block. In the wild, we saw a case where this happened and the thread remained blocked for over 1 minute, causing the TaskReaper to kick in and self-destruct the executor. This PR aims to fix this problem by replacing the {{synchronized}} with a ReentrantLock, which has a {{lockInterruptibly}} method. was: This patch modifies the synchronization in {{Executor.updateDependencies()}} in order to allow tasks to be interrupted while they are blocked and waiting on other tasks to finish downloading dependencies. This synchronization was added years ago in [mesos/spark@{{{}7b9e96c{}}}|https://github.com/mesos/spark/commit/7b9e96c99206c0679d9925e0161fde738a5c7c3a] in order to prevent concurrently-launching tasks from performing concurrent dependency updates (file downloads, and, later, library installation). If one task is downloading dependencies, all other newly-launched tasks will block until the original dependency download is complete. Let's say that a Spark task launches, becomes blocked on a {{updateDependencies()}} call, then is cancelled while it is blocked. Although Spark will send a Thread.interrupt() to the canceled task, the task will continue waiting because threads blocked on a {{synchronized}} won't throw an InterruptedException in response to the interrupt. As a result, the blocked thread will continue to wait until the other thread exits the synchronized block. In the wild, we saw a case where this happened and the thread remained blocked for over 1 minute, causing the TaskReaper to kick in and self-destruct the executor. This PR aims to fix this problem by replacing the {{synchronized}} with a ReentrantLock, which has a {{lockInterruptibly}} method. > Use interruptible lock instead of synchronized in > Executor.updateDependencies() > --- > > Key: SPARK-40235 > URL: https://issues.apache.org/jira/browse/SPARK-40235 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > > This patch modifies the synchronization in {{Executor.updateDependencies()}} > in order to allow tasks to be interrupted while they are blocked and waiting > on other tasks to finish downloading dependencies. > This synchronization was added years ago in > [mesos/spark@{{{}7b9e96c{}}}|https://github.com/mesos/spark/commit/7b9e96c99206c0679d9925e0161fde738a5c7c3a] > in order to prevent concurrently-launching tasks from performing concurrent > dependency updates. If one task is downloading dependencies, all other > newly-launched tasks will block until the original dependency download is > complete. > Let's say that a Spark task launches, becomes blocked on a > {{updateDependencies()}} call, then is cancelled while it is blocked. > Although Spark will send a Thread.interrupt() to the canceled task, the task > will continue waiting because threads blocked on a {{synchronized}} won't > throw an InterruptedException in response to the interrupt. As a result, the > blocked thread will continue to wait until the other thread exits the > synchronized block. > In the wild, we saw a case where this happened and the thread remained > blocked for over 1 minute, causing the TaskReaper to kick in and > self-destruct the executor. > This PR aims to fix this problem by replacing the {{synchronized}} with a > ReentrantLock, which has a {{lockInterruptibly}} method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (SPARK-40235) Use interruptible lock instead of synchronized in Executor.updateDependencies()
Josh Rosen created SPARK-40235: -- Summary: Use interruptible lock instead of synchronized in Executor.updateDependencies() Key: SPARK-40235 URL: https://issues.apache.org/jira/browse/SPARK-40235 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.4.0 Reporter: Josh Rosen Assignee: Josh Rosen This patch modifies the synchronization in {{Executor.updateDependencies()}} in order to allow tasks to be interrupted while they are blocked and waiting on other tasks to finish downloading dependencies. This synchronization was added years ago in [mesos/spark@{{{}7b9e96c{}}}|https://github.com/mesos/spark/commit/7b9e96c99206c0679d9925e0161fde738a5c7c3a] in order to prevent concurrently-launching tasks from performing concurrent dependency updates (file downloads, and, later, library installation). If one task is downloading dependencies, all other newly-launched tasks will block until the original dependency download is complete. Let's say that a Spark task launches, becomes blocked on a {{updateDependencies()}} call, then is cancelled while it is blocked. Although Spark will send a Thread.interrupt() to the canceled task, the task will continue waiting because threads blocked on a {{synchronized}} won't throw an InterruptedException in response to the interrupt. As a result, the blocked thread will continue to wait until the other thread exits the synchronized block. In the wild, we saw a case where this happened and the thread remained blocked for over 1 minute, causing the TaskReaper to kick in and self-destruct the executor. This PR aims to fix this problem by replacing the {{synchronized}} with a ReentrantLock, which has a {{lockInterruptibly}} method. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-40106) Task failure handlers should always run if the task failed
[ https://issues.apache.org/jira/browse/SPARK-40106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reassigned SPARK-40106: -- Assignee: Ryan Johnson > Task failure handlers should always run if the task failed > -- > > Key: SPARK-40106 > URL: https://issues.apache.org/jira/browse/SPARK-40106 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: Ryan Johnson >Assignee: Ryan Johnson >Priority: Major > Fix For: 3.4.0 > > > Today, if a task body succeeds, but a task completion listener fails, task > failure listeners are not called -- even tho the task has indeed failed at > that point. > If a completion listener fails, and failure listeners were not previously > invoked, we should invoke them before running the remaining completion > listeners. > Such a change would increase the utility of task listeners, especially ones > intended to assist with task cleanup. > To give one arbitrary example, code like this appears at several places in > the code (taken from {{executeTask}} method of FileFormatWriter.scala): > {code:java} > try { > Utils.tryWithSafeFinallyAndFailureCallbacks(block = { > // Execute the task to write rows out and commit the task. > dataWriter.writeWithIterator(iterator) > dataWriter.commit() > })(catchBlock = { > // If there is an error, abort the task > dataWriter.abort() > logError(s"Job $jobId aborted.") > }, finallyBlock = { > dataWriter.close() > }) > } catch { > case e: FetchFailedException => > throw e > case f: FileAlreadyExistsException if > SQLConf.get.fastFailFileFormatOutput => > // If any output file to write already exists, it does not make sense > to re-run this task. > // We throw the exception and let Executor throw ExceptionFailure to > abort the job. > throw new TaskOutputFileAlreadyExistException(f) > case t: Throwable => > throw QueryExecutionErrors.taskFailedWhileWritingRowsError(t) > }{code} > If failure listeners were reliably called, the above idiom could potentially > be factored out as two failure listeners plus a completion listener, and > reused rather than duplicated. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-40106) Task failure handlers should always run if the task failed
[ https://issues.apache.org/jira/browse/SPARK-40106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-40106. Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 37531 [https://github.com/apache/spark/pull/37531] > Task failure handlers should always run if the task failed > -- > > Key: SPARK-40106 > URL: https://issues.apache.org/jira/browse/SPARK-40106 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: Ryan Johnson >Priority: Major > Fix For: 3.4.0 > > > Today, if a task body succeeds, but a task completion listener fails, task > failure listeners are not called -- even tho the task has indeed failed at > that point. > If a completion listener fails, and failure listeners were not previously > invoked, we should invoke them before running the remaining completion > listeners. > Such a change would increase the utility of task listeners, especially ones > intended to assist with task cleanup. > To give one arbitrary example, code like this appears at several places in > the code (taken from {{executeTask}} method of FileFormatWriter.scala): > {code:java} > try { > Utils.tryWithSafeFinallyAndFailureCallbacks(block = { > // Execute the task to write rows out and commit the task. > dataWriter.writeWithIterator(iterator) > dataWriter.commit() > })(catchBlock = { > // If there is an error, abort the task > dataWriter.abort() > logError(s"Job $jobId aborted.") > }, finallyBlock = { > dataWriter.close() > }) > } catch { > case e: FetchFailedException => > throw e > case f: FileAlreadyExistsException if > SQLConf.get.fastFailFileFormatOutput => > // If any output file to write already exists, it does not make sense > to re-run this task. > // We throw the exception and let Executor throw ExceptionFailure to > abort the job. > throw new TaskOutputFileAlreadyExistException(f) > case t: Throwable => > throw QueryExecutionErrors.taskFailedWhileWritingRowsError(t) > }{code} > If failure listeners were reliably called, the above idiom could potentially > be factored out as two failure listeners plus a completion listener, and > reused rather than duplicated. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36176) Expose tableExists in pyspark.sql.catalog
[ https://issues.apache.org/jira/browse/SPARK-36176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-36176: --- Fix Version/s: 3.3.0 (was: 3.2.0) > Expose tableExists in pyspark.sql.catalog > - > > Key: SPARK-36176 > URL: https://issues.apache.org/jira/browse/SPARK-36176 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 3.1.2 >Reporter: Dominik Gehl >Assignee: Dominik Gehl >Priority: Minor > Fix For: 3.3.0 > > > expose in pyspark tableExists which is part of the scala implementation -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36176) Expose tableExists in pyspark.sql.catalog
[ https://issues.apache.org/jira/browse/SPARK-36176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17578592#comment-17578592 ] Josh Rosen commented on SPARK-36176: Changed the Fix Version on JIRA: this landed in 3.3.0, not 3.2.0 > Expose tableExists in pyspark.sql.catalog > - > > Key: SPARK-36176 > URL: https://issues.apache.org/jira/browse/SPARK-36176 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 3.1.2 >Reporter: Dominik Gehl >Assignee: Dominik Gehl >Priority: Minor > Fix For: 3.2.0 > > > expose in pyspark tableExists which is part of the scala implementation -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-39983) Should not cache unserialized broadcast relations on the driver
[ https://issues.apache.org/jira/browse/SPARK-39983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-39983. Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 37413 [https://github.com/apache/spark/pull/37413] > Should not cache unserialized broadcast relations on the driver > --- > > Key: SPARK-39983 > URL: https://issues.apache.org/jira/browse/SPARK-39983 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: Alex Balikov >Assignee: Alex Balikov >Priority: Minor > Fix For: 3.4.0 > > > In TorrentBroadcast.writeBlocks we store the unserialized broadcast object in > addition to the serialized version of it - > {code:java} > private def writeBlocks(value: T): Int = { > import StorageLevel._ > // Store a copy of the broadcast variable in the driver so that tasks run > on the driver > // do not create a duplicate copy of the broadcast variable's value. > val blockManager = SparkEnv.get.blockManager > if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, > tellMaster = false)) { > throw new SparkException(s"Failed to store $broadcastId in > BlockManager") > } > {code} > In case of broadcast relations, these objects can be fairly large (60MB in > one observed case) and are not strictly necessary on the driver. > Add the option to not keep the unserialized versions of the objects. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-39983) Should not cache unserialized broadcast relations on the driver
[ https://issues.apache.org/jira/browse/SPARK-39983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reassigned SPARK-39983: -- Assignee: Alex Balikov > Should not cache unserialized broadcast relations on the driver > --- > > Key: SPARK-39983 > URL: https://issues.apache.org/jira/browse/SPARK-39983 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: Alex Balikov >Assignee: Alex Balikov >Priority: Minor > > In TorrentBroadcast.writeBlocks we store the unserialized broadcast object in > addition to the serialized version of it - > {code:java} > private def writeBlocks(value: T): Int = { > import StorageLevel._ > // Store a copy of the broadcast variable in the driver so that tasks run > on the driver > // do not create a duplicate copy of the broadcast variable's value. > val blockManager = SparkEnv.get.blockManager > if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, > tellMaster = false)) { > throw new SparkException(s"Failed to store $broadcastId in > BlockManager") > } > {code} > In case of broadcast relations, these objects can be fairly large (60MB in > one observed case) and are not strictly necessary on the driver. > Add the option to not keep the unserialized versions of the objects. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39973) Avoid noisy warnings logs when spark.scheduler.listenerbus.metrics.maxListenerClassesTimed = 0
[ https://issues.apache.org/jira/browse/SPARK-39973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-39973: --- Description: If {{spark.scheduler.listenerbus.metrics.maxListenerClassesTimed}} has been set to {{0}} to disable listener timers then listener registration will trigger noisy warnings like {code:java} LiveListenerBusMetrics: Not measuring processing time for listener class org.apache.spark.sql.util.ExecutionListenerBus because a maximum of 0 listener classes are already timed.{code} warnings. We should change the code to not print this warning when maxListenerClassesTimed = 0. I don't plan to work on this myself. was: If {{spark.scheduler.listenerbus.metrics.maxListenerClassesTimed}} has been set to {{0}} to disable listener timers then listener registration will trigger noisy warnings like {code:java} LiveListenerBusMetrics: Not measuring processing time for listener class org.apache.spark.sql.util.ExecutionListenerBus because a maximum of 0 listener classes are already timed.{code} warnings. We should change the code to not print this warning when max listeners is zero. > Avoid noisy warnings logs when > spark.scheduler.listenerbus.metrics.maxListenerClassesTimed = 0 > -- > > Key: SPARK-39973 > URL: https://issues.apache.org/jira/browse/SPARK-39973 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Josh Rosen >Priority: Minor > > If {{spark.scheduler.listenerbus.metrics.maxListenerClassesTimed}} has been > set to {{0}} to disable listener timers then listener registration will > trigger noisy warnings like > {code:java} > LiveListenerBusMetrics: Not measuring processing time for listener class > org.apache.spark.sql.util.ExecutionListenerBus because a maximum of 0 > listener classes are already timed.{code} > warnings. > We should change the code to not print this warning when > maxListenerClassesTimed = 0. > I don't plan to work on this myself. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39973) Avoid noisy warnings logs when spark.scheduler.listenerbus.metrics.maxListenerClassesTimed = 0
Josh Rosen created SPARK-39973: -- Summary: Avoid noisy warnings logs when spark.scheduler.listenerbus.metrics.maxListenerClassesTimed = 0 Key: SPARK-39973 URL: https://issues.apache.org/jira/browse/SPARK-39973 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.4.0 Reporter: Josh Rosen If {{spark.scheduler.listenerbus.metrics.maxListenerClassesTimed}} has been set to {{0}} to disable listener timers then listener registration will trigger noisy warnings like {code:java} LiveListenerBusMetrics: Not measuring processing time for listener class org.apache.spark.sql.util.ExecutionListenerBus because a maximum of 0 listener classes are already timed.{code} warnings. We should change the code to not print this warning when max listeners is zero. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39901) Reconsider design of ignoreCorruptFiles feature
Josh Rosen created SPARK-39901: -- Summary: Reconsider design of ignoreCorruptFiles feature Key: SPARK-39901 URL: https://issues.apache.org/jira/browse/SPARK-39901 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Josh Rosen I'm filing this ticket as a followup to the discussion at [https://github.com/apache/spark/pull/36775#issuecomment-1148136217] regarding the `ignoreCorruptFiles` feature: the current implementation is based towards considering a broad range of IOExceptions to be corruption, but this is likely overly-broad and might mis-identify transient errors as corruption (causing non-corrupt data to be erroneously discarded). SPARK-39389 fixes one instance of that problem, but we are still vulnerable to similar issues because of the overall design of this feature. I think we should reconsider the design of this feature: maybe we should switch the default behavior so that only an explicit allowlist of known corruption exceptions can cause files to be skipped. This could be done through involvement of other parts of the code, e.g. rewrapping exceptions into a `CorruptFileException` so higher layers can positively identify corruption. Any changes to behavior here could potentially impact users jobs, so we'd need to think carefully about when we want to change (in a 3.x release? 4.x?) and how we want to provide escape hatches (e.g. configs to revert back to old behavior). -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39864) ExecutionListenerManager's registration of the ExecutionListenerBus should be lazy
Josh Rosen created SPARK-39864: -- Summary: ExecutionListenerManager's registration of the ExecutionListenerBus should be lazy Key: SPARK-39864 URL: https://issues.apache.org/jira/browse/SPARK-39864 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.0.0 Reporter: Josh Rosen Assignee: Josh Rosen Today, ExecutionListenerManager eagerly registers an ExecutionListenerBus SparkListener when it is created, even if the SparkSession has no query execution listeners registered. In applications with many short-lived SparkSessions, this can cause a buildup of empty listeners on the shared listener bus, increasing Spark listener processing times on the driver. If we make the registration lazy then we avoid this driver-side listener performance overhead. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39833) Filtered parquet data frame count() and show() produce inconsistent results when spark.sql.parquet.filterPushdown is true
[ https://issues.apache.org/jira/browse/SPARK-39833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-39833: --- Labels: correctness (was: ) > Filtered parquet data frame count() and show() produce inconsistent results > when spark.sql.parquet.filterPushdown is true > - > > Key: SPARK-39833 > URL: https://issues.apache.org/jira/browse/SPARK-39833 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.1 >Reporter: Michael Allman >Priority: Major > Labels: correctness > > One of our data scientists discovered a problem wherein a data frame > `.show()` call printed non-empty results, but `.count()` printed 0. I've > narrowed the issue to a small, reproducible test case which exhibits this > aberrant behavior. In pyspark, run the following code: > {code:python} > from pyspark.sql.types import * > parquet_pushdown_bug_df = spark.createDataFrame([{"COL0": int(0)}], > schema=StructType(fields=[StructField("COL0",IntegerType(),True)])) > parquet_pushdown_bug_df.repartition(1).write.mode("overwrite").parquet("parquet_pushdown_bug/col0=0/parquet_pushdown_bug.parquet") > reread_parquet_pushdown_bug_df = spark.read.parquet("parquet_pushdown_bug") > reread_parquet_pushdown_bug_df.filter("col0 = 0").show() > print(reread_parquet_pushdown_bug_df.filter("col0 = 0").count()) > {code} > In my usage, this prints a data frame with 1 row and a count of 0. However, > disabling `spark.sql.parquet.filterPushdown` produces consistent results: > {code:python} > spark.conf.set("spark.sql.parquet.filterPushdown", False) > reread_parquet_pushdown_bug_df.filter("col0 = 0").show() > reread_parquet_pushdown_bug_df.filter("col0 = 0").count() > {code} > This will print the same data frame, however it will print a count of 1. The > key to triggering this bug is not just enabling > `spark.sql.parquet.filterPushdown` (which is enabled by default). The case of > the column in the data frame (before writing) must differ from the case of > the partition column in the file path, i.e. COL0 versus col0 or col0 versus > COL0. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39847) Race condition related to interruption of task threads while they are in RocksDBLoader.loadLibrary()
Josh Rosen created SPARK-39847: -- Summary: Race condition related to interruption of task threads while they are in RocksDBLoader.loadLibrary() Key: SPARK-39847 URL: https://issues.apache.org/jira/browse/SPARK-39847 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Josh Rosen Assignee: Josh Rosen One of our workloads experienced a rare failure in `RocksDBLoader` {code:java} Caused by: java.lang.IllegalThreadStateException at java.lang.Thread.start(Thread.java:708) at org.apache.spark.sql.execution.streaming.state.RocksDBLoader$.loadLibrary(RocksDBLoader.scala:51) {code} After investigation, we determined that was due to task cancellation: if the task which starts the RocksDB library loading is interrupted, another thread may begin a load and crash with the thread state exception. Skimming through the code in [RocksDBLoader|https://github.com/databricks/runtime/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala], I spot a potential race condition: * Although the native JNI call is uninterruptible, the thread which calls loadLibrary is still interruptible. Let’s call that thread the “task thread”. * Say we have two tasks, A and B, which both want to load the JNI library. * Say that Task A wins the race to perform the load and enters the synchronized block in loadLibrary(), spawns a child thread to perform the actual loading, then blocks in the loadLibraryThread.join() call. * If Task A is interrupted, an InterruptedException will be thrown and it will exit the loadLibrary synchronized block. * At this point, Task B enters the synchronized block and sees that exception == null because the loading thread is still running, so it calls loadLibraryThread.start() and hits the thread state error. One way to fix this is to add {code:java} if (loadLibraryThread.getState == Thread.State.NEW) { loadLibraryThread.start() }{code} to ensure that only one thread starts the loadLibraryThread. If the original starter thread is interrupted then a new thread will encounter this block, skip the start(), proceed to the join() and block on the original load thread. I will submit a PR with this fix. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39771) If spark.default.parallelism is unset, RDD defaultPartitioner may pick a value that is too large to successfully run
[ https://issues.apache.org/jira/browse/SPARK-39771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-39771: --- Description: [According to its docs|https://github.com/apache/spark/blob/899f6c90eb2de5b46a36710a131d7417010ce4b3/core/src/main/scala/org/apache/spark/Partitioner.scala#L45-L65], {{Partitioner.defaultPartitioner}} will use the maximum number of RDD partitions as its partition count when {{spark.default.parallelism}} is not set. If that number of upstream partitions is very large then this can result in shuffles where {{{}numMappers * numReducers = numMappers^2{}}}, which can cause various problems that prevent the job from successfully running. To help users identify when they have run into this problem, I think we should add warning logs to Spark. As an example of the problem, let's say that I have an RDD with 100,000 partitions and then do a {{reduceByKey}} on it without specifying an explicit partitioner or partition count. In this case, Spark will plan a reduce stage with 100,000 partitions: {code:java} scala> sc.parallelize(1 to 10, 10).map(x => (x, x)).reduceByKey(_ + _).toDebugString res7: String = (10) ShuffledRDD[21] at reduceByKey at :25 [] +-(10) MapPartitionsRDD[20] at map at :25 [] | ParallelCollectionRDD[19] at parallelize at :25 [] {code} This results in the creation of 10 billion shuffle blocks, so if this job _does_ run it is likely to be extremely show. However, it's more likely that the driver will crash when serializing map output statuses: if we were able to use one bit per mapper / reducer pair (which is probably overly optimistic in terms of compressibility) then the map statuses would be ~1.25 gigabytes (and the actual size is probably much larger)! I don't think that users are likely to intentionally wind up in this scenario: it's more likely that either (a) their job depends on {{spark.default.parallelism}} being set but it was run on an environment lacking a value for that config, or (b) their input data significantly grew in size. These scenarios may be rare, but they can be frustrating to debug (especially if a failure occurs midway through a long-running job). I think we should do something to handle this scenario. A good starting point might be for {{Partitioner.defaultPartitioner}} to log a warning when the default partition size exceeds some threshold. In addition, I think it might be a good idea to log a similar warning in {{MapOutputTrackerMaster}} right before we start trying to serialize map statuses: in a real-world situation where this problem cropped up, the map stage ran successfully but the driver crashed when serializing map statuses. Putting a warning about partition counts here makes it more likely that users will spot that error in the logs and be able to identify the source of the problem (compared to a warning that appears much earlier in the job and therefore much farther from the likely site of a crash). was: [According to its docs|https://github.com/apache/spark/blob/899f6c90eb2de5b46a36710a131d7417010ce4b3/core/src/main/scala/org/apache/spark/Partitioner.scala#L45-L65], {{Partitioner.defaultPartitioner}} will use the maximum number of RDD partitions as its partition count when {{spark.default.parallelism}} is not set. If that number of upstream partitions is very large then this can result in shuffles where {{{}numMappers * numReducers = numMappers^2{}}}, which can cause various problems that prevent the job from successfully running. To help users identify when they have run into this problem, I think we should add warning logs to Spark. As an example of the problem, let's say that I have an RDD with 100,000 partitions and then do a {{reduceByKey}} on it without specifying an explicit partitioner or partition count. In this case, Spark will plan a reduce stage with 100,000 partitions: {code:java} scala> sc.parallelize(1 to 10, 10).map(x => (x, x)).reduceByKey(_ + _).toDebugString res7: String = (10) ShuffledRDD[21] at reduceByKey at :25 [] +-(10) MapPartitionsRDD[20] at map at :25 [] | ParallelCollectionRDD[19] at parallelize at :25 [] {code} This results in the creation of 10 billion shuffle blocks, so if this job _does_ run it is likely to be extremely show. However, it's more likely that the driver will crash when serializing map output statuses: if we were able to use one bit per mapper / reducer pair (which is probably overly optimistic in terms of compressibility) then the map statuses would be ~1.25 gigabytes! I don't think that users are likely to intentionally wind up in this scenario: it's more likely that either (a) their job depends on {{spark.default.parallelism}} being set but it was run on an environment lacking a value for that config, or (b) their input data significantly grew in size. These scenarios may be rare, but
[jira] [Created] (SPARK-39771) If spark.default.parallelism is unset, RDD defaultPartitioner may pick a value that is too large to successfully run
Josh Rosen created SPARK-39771: -- Summary: If spark.default.parallelism is unset, RDD defaultPartitioner may pick a value that is too large to successfully run Key: SPARK-39771 URL: https://issues.apache.org/jira/browse/SPARK-39771 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Josh Rosen [According to its docs|https://github.com/apache/spark/blob/899f6c90eb2de5b46a36710a131d7417010ce4b3/core/src/main/scala/org/apache/spark/Partitioner.scala#L45-L65], {{Partitioner.defaultPartitioner}} will use the maximum number of RDD partitions as its partition count when {{spark.default.parallelism}} is not set. If that number of upstream partitions is very large then this can result in shuffles where {{{}numMappers * numReducers = numMappers^2{}}}, which can cause various problems that prevent the job from successfully running. To help users identify when they have run into this problem, I think we should add warning logs to Spark. As an example of the problem, let's say that I have an RDD with 100,000 partitions and then do a {{reduceByKey}} on it without specifying an explicit partitioner or partition count. In this case, Spark will plan a reduce stage with 100,000 partitions: {code:java} scala> sc.parallelize(1 to 10, 10).map(x => (x, x)).reduceByKey(_ + _).toDebugString res7: String = (10) ShuffledRDD[21] at reduceByKey at :25 [] +-(10) MapPartitionsRDD[20] at map at :25 [] | ParallelCollectionRDD[19] at parallelize at :25 [] {code} This results in the creation of 10 billion shuffle blocks, so if this job _does_ run it is likely to be extremely show. However, it's more likely that the driver will crash when serializing map output statuses: if we were able to use one bit per mapper / reducer pair (which is probably overly optimistic in terms of compressibility) then the map statuses would be ~1.25 gigabytes! I don't think that users are likely to intentionally wind up in this scenario: it's more likely that either (a) their job depends on {{spark.default.parallelism}} being set but it was run on an environment lacking a value for that config, or (b) their input data significantly grew in size. These scenarios may be rare, but they can be frustrating to debug (especially if a failure occurs midway through a long-running job). I think we should do something to handle this scenario. A good starting point might be for {{Partitioner.defaultPartitioner}} to log a warning when the default partition size exceeds some threshold. In addition, I think it might be a good idea to log a similar warning in {{MapOutputTrackerMaster}} right before we start trying to serialize map statuses: in a real-world situation where this problem cropped up, the map stage ran successfully but the driver crashed when serializing map statuses. Putting a warning about partition counts here makes it more likely that users will spot that error in the logs and be able to identify the source of the problem (compared to a warning that appears much earlier in the job and therefore much farther from the likely site of a crash). -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38787) Possible correctness issue on stream-stream join when handling edge case
[ https://issues.apache.org/jira/browse/SPARK-38787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-38787: --- Labels: correctness (was: ) > Possible correctness issue on stream-stream join when handling edge case > > > Key: SPARK-38787 > URL: https://issues.apache.org/jira/browse/SPARK-38787 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 3.2.1 >Reporter: Anish Shrigondekar >Assignee: Anish Shrigondekar >Priority: Major > Labels: correctness > Fix For: 3.3.0, 3.2.2 > > > There was an issue on NPE in stream-stream join. SPARK-35659 fixed the issue > “partially”, and the part of fix is to ignore the null value from the last > index on swapping elements in the list so the null value in the last index is > going to be effectively dropped. If it is due to out of sync between > numValues and the actual number of elements, this works effectively as a > correction. > This unfortunately opens the possibility of another “correctness” issue; the > reason we swap the value with last index is effectively to remove the value > in the current index. Doing nothing in any case would mean “we don’t remove > the value in the current index”, whereas the caller would expect the value as > dropped, and even for outer join they may be emitted as left/right null join > output while the value can be re-evaluated and emitted again. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37643) when charVarcharAsString is true, char datatype partition table query incorrect
[ https://issues.apache.org/jira/browse/SPARK-37643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-37643: --- Labels: correctness (was: ) > when charVarcharAsString is true, char datatype partition table query > incorrect > --- > > Key: SPARK-37643 > URL: https://issues.apache.org/jira/browse/SPARK-37643 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.0 > Environment: spark 3.2.0 >Reporter: YuanGuanhu >Assignee: YuanGuanhu >Priority: Major > Labels: correctness > Fix For: 3.1.3, 3.3.0, 3.2.2 > > > This ticket aim at fixing the bug that does not apply right-padding for char > types column when charVarcharAsString is true and partition data length is > less than defined length. > For example, a query below returns nothing in master, but a correct result is > `abc`. > {code:java} > scala> sql("set spark.sql.legacy.charVarcharAsString=true") > scala> sql("CREATE TABLE tb01(i string, c char(5)) USING parquet partitioned > by (c)") > scala> sql("INSERT INTO tb01 values(1, 'abc')") > scala> sql("select c from tb01 where c = 'abc'").show > +---+ > | c| > +---+ > +---+{code} > This is because `ApplyCharTypePadding` rpad the expr to charLength. We should > handle this consider conf spark.sql.legacy.charVarcharAsString value. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39702) Reduce memory overhead of TransportCipher$EncryptedMessage's byteRawChannel buffer
[ https://issues.apache.org/jira/browse/SPARK-39702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-39702: --- Component/s: YARN > Reduce memory overhead of TransportCipher$EncryptedMessage's byteRawChannel > buffer > -- > > Key: SPARK-39702 > URL: https://issues.apache.org/jira/browse/SPARK-39702 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN >Affects Versions: 3.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > > With Spark's encryption enabled ({{{}spark.network.crypto.enabled{}}} set to > {{true}} and {{spark.network.crypto.saslFallback}} set to {{{}false{}}}), I > ran into memory usage problems in the external shuffle service. > This was caused by a problem that is very similar to SPARK-24801: each > {{TransportCipher$EncryptedMessage}} eagerly initializes a buffer that is > used during the encryption process. This buffer is only used once > {{transferTo}} is called, but it is eagerly initialized in the > {{EncryptedMessage}} constructor. This leads to high memory usage when there > are many messages queued in an outgoing channel. > One possible fix would be to mimic SPARK-24801 and make the initialization > lazy. However, we can actually go one step further and share a single re-used > buffer across multiple messages. This is safe because those messages are > _already_ sharing a different buffer which is accessed in the same write > paths. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39702) Reduce memory overhead of TransportCipher$EncryptedMessage's byteRawChannel buffer
Josh Rosen created SPARK-39702: -- Summary: Reduce memory overhead of TransportCipher$EncryptedMessage's byteRawChannel buffer Key: SPARK-39702 URL: https://issues.apache.org/jira/browse/SPARK-39702 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Josh Rosen Assignee: Josh Rosen With Spark's encryption enabled ({{{}spark.network.crypto.enabled{}}} set to {{true}} and {{spark.network.crypto.saslFallback}} set to {{{}false{}}}), I ran into memory usage problems in the external shuffle service. This was caused by a problem that is very similar to SPARK-24801: each {{TransportCipher$EncryptedMessage}} eagerly initializes a buffer that is used during the encryption process. This buffer is only used once {{transferTo}} is called, but it is eagerly initialized in the {{EncryptedMessage}} constructor. This leads to high memory usage when there are many messages queued in an outgoing channel. One possible fix would be to mimic SPARK-24801 and make the initialization lazy. However, we can actually go one step further and share a single re-used buffer across multiple messages. This is safe because those messages are _already_ sharing a different buffer which is accessed in the same write paths. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37865) Spark should not dedup the groupingExpressions when the first child of Union has duplicate columns
[ https://issues.apache.org/jira/browse/SPARK-37865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-37865: --- Labels: correctness (was: ) > Spark should not dedup the groupingExpressions when the first child of Union > has duplicate columns > -- > > Key: SPARK-37865 > URL: https://issues.apache.org/jira/browse/SPARK-37865 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Chao Gao >Assignee: Karen Feng >Priority: Major > Labels: correctness > Fix For: 3.1.3, 3.0.4, 3.3.0, 3.2.2 > > > When the first child of Union has duplicate columns like select a, a from t1 > union select a, b from t2, spark only use the first column to aggregate the > results, which would make the results incorrect, and this behavior is > inconsistent with other engines like PostgreSQL, MySQL. We could alias the > attribute of the first child of union to resolve this, or you could argue > that this is the feature of Spark SQL. > sample query: > select > a, > a > from values (1, 1), (1, 2) as t1(a, b) > UNION > SELECT > a, > b > from values (1, 1), (1, 2) as t2(a, b) > result is > (1,1) > result from PostgreSQL and MySQL > (1,1) > (1,2) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-39489) Improve EventLoggingListener and ReplayListener performance by replacing Json4S ASTs with Jackson trees
[ https://issues.apache.org/jira/browse/SPARK-39489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-39489. Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 36885 [https://github.com/apache/spark/pull/36885] > Improve EventLoggingListener and ReplayListener performance by replacing > Json4S ASTs with Jackson trees > --- > > Key: SPARK-39489 > URL: https://issues.apache.org/jira/browse/SPARK-39489 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > Fix For: 3.4.0 > > > Spark's event log JsonProtocol currently uses Json4s ASTs to generate and > parse JSON. Performance overheads from Json4s account for a significant > proportion of all time spent in JsonProtocol. If we replace Json4s usage with > direct usage of Jackson APIs then we can significantly improve performance > (~2x improvement for writing and reading in my own local microbenchmarks). > This performance improvement translates to faster history server load times > and reduced load on the Spark driver (and reduced likelihood of dropping > events because the listener cannot keep up, therefore reducing the likelihood > of inconsistent Spark UIs). > Reducing our usage of Json4s is also a step towards being able to eventually > remove our dependency on Json4s: Spark's current use of Json4s creates > library conflicts for end users who want to adopt Json4s 4 (see discussion on > PRs for SPARK-36408). If Spark can eventually remove its Json4s dependency > then we will completely eliminate such conflicts. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39658) Reconsider exposure of Json4s symbols in public ResourceInformation.toJson API
Josh Rosen created SPARK-39658: -- Summary: Reconsider exposure of Json4s symbols in public ResourceInformation.toJson API Key: SPARK-39658 URL: https://issues.apache.org/jira/browse/SPARK-39658 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Josh Rosen Followup to discussion at [https://github.com/apache/spark/pull/36885#discussion_r898644147] Spark's `ResourceInformation` class exposes Json4s symbols in its public API (via the toJson method). In the long run, I don't think that exposing third-party library symbols in our public APIs is a good idea. Perhaps we should make this API private or change it to require that users / implementors return an object which can be turned to JSON via Jackson ObjectMapper. In principle we still have some freedom to change this because the ResourceInformation class is marked as `@Evolving` I'm filing this ticket so that I can reference it in TODO code comments. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-39636) Fix multiple small bugs in JsonProtocol, impacting StorageLevel and Task/Executor resource requests
[ https://issues.apache.org/jira/browse/SPARK-39636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-39636. Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 37027 [https://github.com/apache/spark/pull/37027] > Fix multiple small bugs in JsonProtocol, impacting StorageLevel and > Task/Executor resource requests > --- > > Key: SPARK-39636 > URL: https://issues.apache.org/jira/browse/SPARK-39636 > Project: Spark > Issue Type: Improvement > Components: Spark Core, Web UI >Affects Versions: 1.0.0 >Reporter: Josh Rosen >Assignee: Josh Rosen >Priority: Major > Fix For: 3.4.0 > > > While working on SPARK-39489, I spotted three pre-existing bugs in > JsonProtocol: > * {{TaskResourceRequest}} loses precision for values < 0.5. The {{amount}} > is a floating point number which is either between 0 and 0.5 or is a positive > integer, but the read path assumes it is an integer. > * {{ExecutorResourceRequest}} integer overflows for values larger than an > integer because the write path writes longs but the read path assumes > integers. > * The {{OFF_HEAP}} storage level is not handled properly: the {{offHeap}} > field isn't included in the JSON, so this StorageLevel cannot be > round-tripped through JSON. This could cause the History Server to display > inaccurate "off heap memory used" stats on the executors page. > I'm submitting a separate PR to fix those issues. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39636) Fix multiple small bugs in JsonProtocol, impacting StorageLevel and Task/Executor resource requests
Josh Rosen created SPARK-39636: -- Summary: Fix multiple small bugs in JsonProtocol, impacting StorageLevel and Task/Executor resource requests Key: SPARK-39636 URL: https://issues.apache.org/jira/browse/SPARK-39636 Project: Spark Issue Type: Improvement Components: Spark Core, Web UI Affects Versions: 1.0.0 Reporter: Josh Rosen Assignee: Josh Rosen While working on SPARK-39489, I spotted three pre-existing bugs in JsonProtocol: * {{TaskResourceRequest}} loses precision for values < 0.5. The {{amount}} is a floating point number which is either between 0 and 0.5 or is a positive integer, but the read path assumes it is an integer. * {{ExecutorResourceRequest}} integer overflows for values larger than an integer because the write path writes longs but the read path assumes integers. * The {{OFF_HEAP}} storage level is not handled properly: the {{offHeap}} field isn't included in the JSON, so this StorageLevel cannot be round-tripped through JSON. This could cause the History Server to display inaccurate "off heap memory used" stats on the executors page. I'm submitting a separate PR to fix those issues. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17728) UDFs are run too many times
[ https://issues.apache.org/jira/browse/SPARK-17728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557756#comment-17557756 ] Josh Rosen commented on SPARK-17728: As of SPARK-36718 in Spark 3.3 I think the {{explode(array(udf()))}} trick should no longer be needed: Spark will avoid collapsing projections which would lead to duplication of expensive-to-evaluate expressions. There still might be some rare cases where you might need that trick (e.g. to work around SPARK-38485), but I think most cases should be addressed by Spark 3.3's improved CollapseProject optimizer rule. > UDFs are run too many times > --- > > Key: SPARK-17728 > URL: https://issues.apache.org/jira/browse/SPARK-17728 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0 > Environment: Databricks Cloud / Spark 2.0.0 >Reporter: Jacob Eisinger >Priority: Minor > Attachments: over_optimized_udf.html > > > h3. Background > Llonger running processes that might run analytics or contact external > services from UDFs. The response might not just be a field, but instead a > structure of information. When attempting to break out this information, it > is critical that query is optimized correctly. > h3. Steps to Reproduce > # Create some sample data. > # Create a UDF that returns a multiple attributes. > # Run UDF over some data. > # Create new columns from the multiple attributes. > # Observe run time. > h3. Actual Results > The UDF is executed *multiple times* _per row._ > h3. Expected Results > The UDF should only be executed *once* _per row._ > h3. Workaround > Cache the Dataset after UDF execution. > h3. Details > For code and more details, see [^over_optimized_udf.html] -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39489) Improve EventLoggingListener and ReplayListener performance by replacing Json4S ASTs with Jackson trees
Josh Rosen created SPARK-39489: -- Summary: Improve EventLoggingListener and ReplayListener performance by replacing Json4S ASTs with Jackson trees Key: SPARK-39489 URL: https://issues.apache.org/jira/browse/SPARK-39489 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Josh Rosen Assignee: Josh Rosen Spark's event log JsonProtocol currently uses Json4s ASTs to generate and parse JSON. Performance overheads from Json4s account for a significant proportion of all time spent in JsonProtocol. If we replace Json4s usage with direct usage of Jackson APIs then we can significantly improve performance (~2x improvement for writing and reading in my own local microbenchmarks). This performance improvement translates to faster history server load times and reduced load on the Spark driver (and reduced likelihood of dropping events because the listener cannot keep up, therefore reducing the likelihood of inconsistent Spark UIs). Reducing our usage of Json4s is also a step towards being able to eventually remove our dependency on Json4s: Spark's current use of Json4s creates library conflicts for end users who want to adopt Json4s 4 (see discussion on PRs for SPARK-36408). If Spark can eventually remove its Json4s dependency then we will completely eliminate such conflicts. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-39465) Log4j version upgrade to 2.17.2
[ https://issues.apache.org/jira/browse/SPARK-39465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen resolved SPARK-39465. Resolution: Done > Log4j version upgrade to 2.17.2 > --- > > Key: SPARK-39465 > URL: https://issues.apache.org/jira/browse/SPARK-39465 > Project: Spark > Issue Type: Dependency upgrade > Components: Java API >Affects Versions: 3.2.1 > Environment: Production >Reporter: Chethan G B >Priority: Major > > Hi Team, > There were talks about upgrading log4j to latest version available as part of > security fix. > Wanted to know, if it is already upgraded. > > Note: We are using below dependencies, > > > org.apache.spark > spark-core_2.12 > 3.0.1 > > > org.apache.spark > spark-sql_2.12 > 3.0.1 > > Kindly let us know when the log4j upgrade will be available for users ? -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-39465) Log4j version upgrade to 2.17.2
[ https://issues.apache.org/jira/browse/SPARK-39465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen reopened SPARK-39465: > Log4j version upgrade to 2.17.2 > --- > > Key: SPARK-39465 > URL: https://issues.apache.org/jira/browse/SPARK-39465 > Project: Spark > Issue Type: Dependency upgrade > Components: Java API >Affects Versions: 3.2.1 > Environment: Production >Reporter: Chethan G B >Priority: Major > > Hi Team, > There were talks about upgrading log4j to latest version available as part of > security fix. > Wanted to know, if it is already upgraded. > > Note: We are using below dependencies, > > > org.apache.spark > spark-core_2.12 > 3.0.1 > > > org.apache.spark > spark-sql_2.12 > 3.0.1 > > Kindly let us know when the log4j upgrade will be available for users ? -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-39465) Log4j version upgrade to 2.17.2
[ https://issues.apache.org/jira/browse/SPARK-39465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554844#comment-17554844 ] Josh Rosen edited comment on SPARK-39465 at 6/16/22 1:21 AM: - Spark uses Log4J 2.x starting in Spark 3.3.0+; see SPARK-37814 The migration from Log4J 1.x to Log4J 2.x is too large of a change for us to backport to existing Spark versions (see [related discussion on another ticket|https://issues.apache.org/jira/browse/SPARK-37883?focusedCommentId=17481521=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17481521]). As a result, if you want to use Log4J 2.x then you will need to upgrade to Spark 3.3.0. The [Spark 3.3.0 release vote just passed yesterday|https://lists.apache.org/thread/zg6k1spw6k1c7brgo6t7qldvsqbmfytm], so the release should be published in the next couple of days. was (Author: joshrosen): Spark uses Log4J 2.x starting in Spark 3.3.0+; see SPARK-37814 The migration from Log4J 1.x to Log4J 2.x is too large of a change for us to backport to existing Spark versions (see related discussion on another ticket). As a result, if you want to use Log4J 2.x then you will need to upgrade to Spark 3.3.0. The [Spark 3.3.0 release vote just passed yesterday|https://lists.apache.org/thread/zg6k1spw6k1c7brgo6t7qldvsqbmfytm], so the release should be published in the next couple of days. > Log4j version upgrade to 2.17.2 > --- > > Key: SPARK-39465 > URL: https://issues.apache.org/jira/browse/SPARK-39465 > Project: Spark > Issue Type: Dependency upgrade > Components: Java API >Affects Versions: 3.2.1 > Environment: Production >Reporter: Chethan G B >Priority: Major > > Hi Team, > There were talks about upgrading log4j to latest version available as part of > security fix. > Wanted to know, if it is already upgraded. > > Note: We are using below dependencies, > > > org.apache.spark > spark-core_2.12 > 3.0.1 > > > org.apache.spark > spark-sql_2.12 > 3.0.1 > > Kindly let us know when the log4j upgrade will be available for users ? -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org