[jira] [Updated] (SPARK-44649) Runtime Filter supports passing equivalent creation side expressions
[ https://issues.apache.org/jira/browse/SPARK-44649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-44649: --- Labels: pull-request-available (was: ) > Runtime Filter supports passing equivalent creation side expressions > > > Key: SPARK-44649 > URL: https://issues.apache.org/jira/browse/SPARK-44649 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 4.0.0 >Reporter: Jiaan Geng >Priority: Major > Labels: pull-request-available > > {code:java} > SELECT > d_year, > i_brand_id, > i_class_id, > i_category_id, > i_manufact_id, > cs_quantity - COALESCE(cr_return_quantity, 0) AS sales_cnt, > cs_ext_sales_price - COALESCE(cr_return_amount, 0.0) AS sales_amt > FROM catalog_sales > JOIN item ON i_item_sk = cs_item_sk > JOIN date_dim ON d_date_sk = cs_sold_date_sk > LEFT JOIN catalog_returns ON (cs_order_number = cr_order_number > AND cs_item_sk = cr_item_sk) > WHERE i_category = 'Books' > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45449) Cache Invalidation Issue with JDBC Table
[ https://issues.apache.org/jira/browse/SPARK-45449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-45449: --- Labels: pull-request-available (was: ) > Cache Invalidation Issue with JDBC Table > > > Key: SPARK-45449 > URL: https://issues.apache.org/jira/browse/SPARK-45449 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: liangyongyuan >Priority: Major > Labels: pull-request-available > > We have identified a cache invalidation issue when caching JDBC tables in > Spark SQL. The cached table is unexpectedly invalidated when queried, leading > to a re-read from the JDBC table instead of retrieving data from the cache. > Example SQL: > {code:java} > CACHE TABLE cache_t SELECT * FROM mysql.test.test1; > SELECT * FROM cache_t; > {code} > Expected Behavior: > The expectation is that querying the cached table (cache_t) should retrieve > the result from the cache without re-evaluating the execution plan. > Actual Behavior: > However, the cache is invalidated, and the content is re-read from the JDBC > table. > Root Cause: > The issue lies in the 'CacheData' class, where the comparison involves > 'JDBCTable.' The 'JDBCTable' is a case class: > {code:java} > case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: > JDBCOptions) > {code} > The comparison of non-case class components, such as 'jdbcOptions,' involves > pointer comparison. This leads to unnecessary cache invalidation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45449) Cache Invalidation Issue with JDBC Table
[ https://issues.apache.org/jira/browse/SPARK-45449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liangyongyuan updated SPARK-45449: -- Summary: Cache Invalidation Issue with JDBC Table (was: Cache Invalidation Issue with JDBC Table in Spark SQL) > Cache Invalidation Issue with JDBC Table > > > Key: SPARK-45449 > URL: https://issues.apache.org/jira/browse/SPARK-45449 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: liangyongyuan >Priority: Major > > We have identified a cache invalidation issue when caching JDBC tables in > Spark SQL. The cached table is unexpectedly invalidated when queried, leading > to a re-read from the JDBC table instead of retrieving data from the cache. > Example SQL: > {code:java} > CACHE TABLE cache_t SELECT * FROM mysql.test.test1; > SELECT * FROM cache_t; > {code} > Expected Behavior: > The expectation is that querying the cached table (cache_t) should retrieve > the result from the cache without re-evaluating the execution plan. > Actual Behavior: > However, the cache is invalidated, and the content is re-read from the JDBC > table. > Root Cause: > The issue lies in the 'CacheData' class, where the comparison involves > 'JDBCTable.' The 'JDBCTable' is a case class: > {code:java} > case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: > JDBCOptions) > {code} > The comparison of non-case class components, such as 'jdbcOptions,' involves > pointer comparison. This leads to unnecessary cache invalidation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45449) Cache Invalidation Issue with JDBC Table in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-45449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liangyongyuan updated SPARK-45449: -- Summary: Cache Invalidation Issue with JDBC Table in Spark SQL (was: Unable to cache reading jdbc data) > Cache Invalidation Issue with JDBC Table in Spark SQL > - > > Key: SPARK-45449 > URL: https://issues.apache.org/jira/browse/SPARK-45449 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: liangyongyuan >Priority: Major > > We have identified a cache invalidation issue when caching JDBC tables in > Spark SQL. The cached table is unexpectedly invalidated when queried, leading > to a re-read from the JDBC table instead of retrieving data from the cache. > Example SQL: > {code:java} > CACHE TABLE cache_t SELECT * FROM mysql.test.test1; > SELECT * FROM cache_t; > {code} > Expected Behavior: > The expectation is that querying the cached table (cache_t) should retrieve > the result from the cache without re-evaluating the execution plan. > Actual Behavior: > However, the cache is invalidated, and the content is re-read from the JDBC > table. > Root Cause: > The issue lies in the 'CacheData' class, where the comparison involves > 'JDBCTable.' The 'JDBCTable' is a case class: > {code:java} > case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: > JDBCOptions) > {code} > The comparison of non-case class components, such as 'jdbcOptions,' involves > pointer comparison. This leads to unnecessary cache invalidation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45450) Fix imports according to PEP8: pyspark.pandas and pyspark (core)
Hyukjin Kwon created SPARK-45450: Summary: Fix imports according to PEP8: pyspark.pandas and pyspark (core) Key: SPARK-45450 URL: https://issues.apache.org/jira/browse/SPARK-45450 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 4.0.0 Reporter: Hyukjin Kwon https://peps.python.org/pep-0008/#imports -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45449) Unable to cache reading jdbc data
liangyongyuan created SPARK-45449: - Summary: Unable to cache reading jdbc data Key: SPARK-45449 URL: https://issues.apache.org/jira/browse/SPARK-45449 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.0 Reporter: liangyongyuan We have identified a cache invalidation issue when caching JDBC tables in Spark SQL. The cached table is unexpectedly invalidated when queried, leading to a re-read from the JDBC table instead of retrieving data from the cache. Example SQL: {code:java} CACHE TABLE cache_t SELECT * FROM mysql.test.test1; SELECT * FROM cache_t; {code} Expected Behavior: The expectation is that querying the cached table (cache_t) should retrieve the result from the cache without re-evaluating the execution plan. Actual Behavior: However, the cache is invalidated, and the content is re-read from the JDBC table. Root Cause: The issue lies in the 'CacheData' class, where the comparison involves 'JDBCTable.' The 'JDBCTable' is a case class: {code:java} case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions) {code} The comparison of non-case class components, such as 'jdbcOptions,' involves pointer comparison. This leads to unnecessary cache invalidation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45448) Fix imports according to PEP8: pyspark.testing, pyspark.mllib, pyspark.resource and pyspark.streaming
Hyukjin Kwon created SPARK-45448: Summary: Fix imports according to PEP8: pyspark.testing, pyspark.mllib, pyspark.resource and pyspark.streaming Key: SPARK-45448 URL: https://issues.apache.org/jira/browse/SPARK-45448 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 4.0.0 Reporter: Hyukjin Kwon https://peps.python.org/pep-0008/#imports -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45447) Reduce the memory required to start the LocalClusterSparkContext in the mllib module test cases. #43242
Yang Jie created SPARK-45447: Summary: Reduce the memory required to start the LocalClusterSparkContext in the mllib module test cases. #43242 Key: SPARK-45447 URL: https://issues.apache.org/jira/browse/SPARK-45447 Project: Spark Issue Type: Improvement Components: MLlib, Tests Affects Versions: 4.0.0 Reporter: Yang Jie -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45445) Upgrade snappy to 1.1.10.5
[ https://issues.apache.org/jira/browse/SPARK-45445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-45445: --- Labels: pull-request-available (was: ) > Upgrade snappy to 1.1.10.5 > -- > > Key: SPARK-45445 > URL: https://issues.apache.org/jira/browse/SPARK-45445 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 4.0.0 >Reporter: BingKun Pan >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45445) Upgrade snappy to 1.1.10.5
BingKun Pan created SPARK-45445: --- Summary: Upgrade snappy to 1.1.10.5 Key: SPARK-45445 URL: https://issues.apache.org/jira/browse/SPARK-45445 Project: Spark Issue Type: Improvement Components: Build Affects Versions: 4.0.0 Reporter: BingKun Pan -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772729#comment-17772729 ] XiDuo You commented on SPARK-45443: --- hi [~erenavsarogullari] , it seems that, it depends on the behavior of rdd cache. Say, what happens if we materialize a cached rdd twice at the same time ? There are some race condition in block manager per rdd partition so it makes things slow. BTW, what's the behavior before we have TableCacheQueryStage ? Does not it have this issue ? > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") > df4.show() {code} > *Physical Plan:* > {code:java} > == Physical Plan == > AdaptiveSparkPlan (31) > +- == Final Plan == > CollectLimit (21) > +- * Project (20) > +- * SortMergeJoin FullOuter (19) > :- * Sort (10) > : +- * Filter (9) > : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, > rowCount=5) > : +- InMemoryTableScan (1) > : +- InMemoryRelation (2) > : +- AdaptiveSparkPlan (7) > : +- HashAggregate (6) > : +- Exchange (5) > : +- HashAggregate (4) > : +- LocalTableScan (3) > +- * Sort (18) > +- * Filter (17) > +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, > rowCount=5) > +- InMemoryTableScan (11) > +- InMemoryRelation (12) > +- AdaptiveSparkPlan (15) > +- HashAggregate (14) > +- Exchange (13) > +- HashAggregate (4) > +- LocalTableScan (3) {code} > *Stages DAGs materializing the same IMR instance:* > !IMR Materialization - Stage 2.png|width=303,height=134! > !IMR Materialization - Stage 3.png|width=303,height=134! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-42309) Assign name to _LEGACY_ERROR_TEMP_1204
[ https://issues.apache.org/jira/browse/SPARK-42309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-42309: --- Labels: pull-request-available (was: ) > Assign name to _LEGACY_ERROR_TEMP_1204 > -- > > Key: SPARK-42309 > URL: https://issues.apache.org/jira/browse/SPARK-42309 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Haejoon Lee >Assignee: Haejoon Lee >Priority: Major > Labels: pull-request-available > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45444) Upgrade `commons-io` to 1.24.0
[ https://issues.apache.org/jira/browse/SPARK-45444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-45444: --- Labels: pull-request-available (was: ) > Upgrade `commons-io` to 1.24.0 > -- > > Key: SPARK-45444 > URL: https://issues.apache.org/jira/browse/SPARK-45444 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 4.0.0 >Reporter: BingKun Pan >Priority: Trivial > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45444) Upgrade `commons-io` to 1.24.0
BingKun Pan created SPARK-45444: --- Summary: Upgrade `commons-io` to 1.24.0 Key: SPARK-45444 URL: https://issues.apache.org/jira/browse/SPARK-45444 Project: Spark Issue Type: Improvement Components: Build Affects Versions: 4.0.0 Reporter: BingKun Pan -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45442) Refine docstring of `DataFrame.show`
[ https://issues.apache.org/jira/browse/SPARK-45442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-45442: --- Labels: pull-request-available (was: ) > Refine docstring of `DataFrame.show` > > > Key: SPARK-45442 > URL: https://issues.apache.org/jira/browse/SPARK-45442 > Project: Spark > Issue Type: Sub-task > Components: Documentation, PySpark >Affects Versions: 4.0.0 >Reporter: Allison Wang >Priority: Major > Labels: pull-request-available > > Refine docstring of `DataFrame.show()` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats in order to apply AQE optimizations into remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. This behavior can be more visible when cached RDD size is high. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Stages DAGs materializing the same IMR instance:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats in order to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. This behavior can be more visible when cached RDD size is high. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) :
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats in order to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. This behavior can be more visible when cached RDD size is high. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Stages DAGs materializing the same IMR instance:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. This behavior can be more visible when cached RDD size is high. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +-
[jira] [Updated] (SPARK-42775) approx_percentile produces wrong results for large decimals.
[ https://issues.apache.org/jira/browse/SPARK-42775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-42775: --- Labels: pull-request-available (was: ) > approx_percentile produces wrong results for large decimals. > > > Key: SPARK-42775 > URL: https://issues.apache.org/jira/browse/SPARK-42775 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0, 2.2.0, 2.3.0, 2.4.0, 3.0.0, 3.1.0, 3.2.0, 3.3.0, > 3.4.0 >Reporter: Chenhao Li >Priority: Major > Labels: pull-request-available > > In the {{approx_percentile}} expression, Spark casts decimal to double to > update the aggregation state > ([ApproximatePercentile.scala#L181|https://github.com/apache/spark/blob/933dc0c42f0caf74aaa077fd4f2c2e7208452b9b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala#L181]) > and casts the result double back to decimal > ([ApproximatePercentile.scala#L206|https://github.com/apache/spark/blob/933dc0c42f0caf74aaa077fd4f2c2e7208452b9b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala#L206]). > The precision loss in the casts can make the result decimal out of its > precision range. This can lead to the following counter-intuitive results: > {code:sql} > spark-sql> select approx_percentile(col, 0.5) from values > (999) as tab(col); > NULL > spark-sql> select approx_percentile(col, 0.5) is null from values > (999) as tab(col); > false > spark-sql> select cast(approx_percentile(col, 0.5) as string) from values > (999) as tab(col); > 1000 > spark-sql> desc select approx_percentile(col, 0.5) from values > (999) as tab(col); > approx_percentile(col, 0.5, 1)decimal(19,0) > {code} > The result is actually not null, so the second query returns false. The first > query returns null because the result cannot fit into {{{}decimal(19, 0){}}}. > A suggested fix is to use {{Decimal.changePrecision}} here to ensure the > result fits, and really returns a null or throws an exception when the result > doesn't fit. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45440) Incorrect summary counts from a CSV file
[ https://issues.apache.org/jira/browse/SPARK-45440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772724#comment-17772724 ] Bruce Robbins commented on SPARK-45440: --- I added {{inferSchema=true}} as a datasource option in your example and I got the expected answer. Otherwise it's doing a max and min on a string (not a number). > Incorrect summary counts from a CSV file > > > Key: SPARK-45440 > URL: https://issues.apache.org/jira/browse/SPARK-45440 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 3.5.0 > Environment: Pyspark version 3.5.0 >Reporter: Evan Volgas >Priority: Major > Labels: aggregation, bug, pyspark > > I am using pip-installed Pyspark version 3.5.0 inside the context of an > IPython shell. The task is straightforward: take [this CSV > file|https://gist.githubusercontent.com/evanvolgas/e5cb082673ec947239658291f2251de4/raw/a9c5e9866ac662a816f9f3828a2d184032f604f0/AAPL.csv] > of AAPL stock prices and compute the minimum and maximum volume weighted > average price for the entire file. > My code is [here. > |https://gist.github.com/evanvolgas/e4aa75fec4179bb7075a5283867f127c]I've > also performed the same computation in DuckDB because I noticed that the > results of the Spark code are wrong. > Literally, the exact same SQL in DuckDB and in Spark yield different results, > and Spark's are wrong. > I have never seen this behavior in a Spark release before. I'm very confused > by it, and curious if anyone else can replicate this behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. This behavior can be more visible when cached RDD size is high. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Stages DAGs materializing the same IMR instance:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2)
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Stages DAGs materializing the same IMR instance:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) :
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Replicated Stages DAGs:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) :
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Replicated Stages DAGs:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +-
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Replicated Stages DAGs:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +-
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} Replicated Stages DAGs: !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +-
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} Replicated Stages DAG: !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +-
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Attachment: IMR Materialization - Stage 3.png IMR Materialization - Stage 2.png > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats to apply AQE optimizations onto > remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. > For example, if there are 2 TableCacheQueryStage instances referencing same > IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization > takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") > df4.show() {code} > *Physical Plan:* > {code:java} > == Physical Plan == > AdaptiveSparkPlan (31) > +- == Final Plan == > CollectLimit (21) > +- * Project (20) > +- * SortMergeJoin FullOuter (19) > :- * Sort (10) > : +- * Filter (9) > : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, > rowCount=5) > : +- InMemoryTableScan (1) > : +- InMemoryRelation (2) > : +- AdaptiveSparkPlan (7) > : +- HashAggregate (6) > : +- Exchange (5) > : +- HashAggregate (4) > : +- LocalTableScan (3) > +- * Sort (18) > +- * Filter (17) > +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, > rowCount=5) > +- InMemoryTableScan (11) > +- InMemoryRelation (12) > +- AdaptiveSparkPlan (15) > +- HashAggregate (14) > +- Exchange (13) > +- HashAggregate (4) > +- LocalTableScan (3) {code} > Replicated Stages DAG: > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} Replicated Stages DAG: was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16),
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Summary: Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization (was: Revisit TableCacheQueryStage to avoid replicated IMR materialization) > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats to apply AQE optimizations onto > remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. > For example, if there are 2 TableCacheQueryStage instances referencing same > IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization > takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") > df4.show() {code} > *Physical Plan:* > {code:java} > == Physical Plan == > AdaptiveSparkPlan (31) > +- == Final Plan == > CollectLimit (21) > +- * Project (20) > +- * SortMergeJoin FullOuter (19) > :- * Sort (10) > : +- * Filter (9) > : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, > rowCount=5) > : +- InMemoryTableScan (1) > : +- InMemoryRelation (2) > : +- AdaptiveSparkPlan (7) > : +- HashAggregate (6) > : +- Exchange (5) > : +- HashAggregate (4) > : +- LocalTableScan (3) > +- * Sort (18) > +- * Filter (17) > +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, > rowCount=5) > +- InMemoryTableScan (11) > +- InMemoryRelation (12) > +- AdaptiveSparkPlan (15) > +- HashAggregate (14) > +- Exchange (13) > +- HashAggregate (4) > +- LocalTableScan (3) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated IMR materialization
Eren Avsarogullari created SPARK-45443: -- Summary: Revisit TableCacheQueryStage to avoid replicated IMR materialization Key: SPARK-45443 URL: https://issues.apache.org/jira/browse/SPARK-45443 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.0 Reporter: Eren Avsarogullari TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-45425) Mapped TINYINT to ShortType for MsSqlServerDialect
[ https://issues.apache.org/jira/browse/SPARK-45425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang resolved SPARK-45425. Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 43232 [https://github.com/apache/spark/pull/43232] > Mapped TINYINT to ShortType for MsSqlServerDialect > -- > > Key: SPARK-45425 > URL: https://issues.apache.org/jira/browse/SPARK-45425 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 3.5.0 >Reporter: Zerui Bao >Assignee: Zerui Bao >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > > TINYINT of SQL server is not correctly mapped to spark types when using JDBC > connector. For now, it is mapped to the IntegerType, which is not accurate. > It should be mapped to ShortType. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-45425) Mapped TINYINT to ShortType for MsSqlServerDialect
[ https://issues.apache.org/jira/browse/SPARK-45425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang reassigned SPARK-45425: -- Assignee: Zerui Bao > Mapped TINYINT to ShortType for MsSqlServerDialect > -- > > Key: SPARK-45425 > URL: https://issues.apache.org/jira/browse/SPARK-45425 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 3.5.0 >Reporter: Zerui Bao >Assignee: Zerui Bao >Priority: Major > Labels: pull-request-available > > TINYINT of SQL server is not correctly mapped to spark types when using JDBC > connector. For now, it is mapped to the IntegerType, which is not accurate. > It should be mapped to ShortType. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45442) Refine docstring of `DataFrame.show`
Allison Wang created SPARK-45442: Summary: Refine docstring of `DataFrame.show` Key: SPARK-45442 URL: https://issues.apache.org/jira/browse/SPARK-45442 Project: Spark Issue Type: Sub-task Components: Documentation, PySpark Affects Versions: 4.0.0 Reporter: Allison Wang Refine docstring of `DataFrame.show()` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45441) Introduce more util functions for PythonWorkerUtils
[ https://issues.apache.org/jira/browse/SPARK-45441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-45441: --- Labels: pull-request-available (was: ) > Introduce more util functions for PythonWorkerUtils > --- > > Key: SPARK-45441 > URL: https://issues.apache.org/jira/browse/SPARK-45441 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 4.0.0 >Reporter: Takuya Ueshin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45441) Introduce more util functions for PythonWorkerUtils
Takuya Ueshin created SPARK-45441: - Summary: Introduce more util functions for PythonWorkerUtils Key: SPARK-45441 URL: https://issues.apache.org/jira/browse/SPARK-45441 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 4.0.0 Reporter: Takuya Ueshin -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45440) Incorrect summary counts from a CSV file
Evan Volgas created SPARK-45440: --- Summary: Incorrect summary counts from a CSV file Key: SPARK-45440 URL: https://issues.apache.org/jira/browse/SPARK-45440 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 3.5.0 Environment: Pyspark version 3.5.0 Reporter: Evan Volgas I am using pip-installed Pyspark version 3.5.0 inside the context of an IPython shell. The task is straightforward: take [this CSV file|https://gist.githubusercontent.com/evanvolgas/e5cb082673ec947239658291f2251de4/raw/a9c5e9866ac662a816f9f3828a2d184032f604f0/AAPL.csv] of AAPL stock prices and compute the minimum and maximum volume weighted average price for the entire file. My code is [here. |https://gist.github.com/evanvolgas/e4aa75fec4179bb7075a5283867f127c]I've also performed the same computation in DuckDB because I noticed that the results of the Spark code are wrong. Literally, the exact same SQL in DuckDB and in Spark yield different results, and Spark's are wrong. I have never seen this behavior in a Spark release before. I'm very confused by it, and curious if anyone else can replicate this behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45438) Decimal precision exceeds max precision error when using unary minus on min Decimal values on Scala 2.13 Spark
[ https://issues.apache.org/jira/browse/SPARK-45438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Navin Kumar updated SPARK-45438: Summary: Decimal precision exceeds max precision error when using unary minus on min Decimal values on Scala 2.13 Spark (was: Decimal precision exceeds max precision error when using unary minus on min Decimal values) > Decimal precision exceeds max precision error when using unary minus on min > Decimal values on Scala 2.13 Spark > -- > > Key: SPARK-45438 > URL: https://issues.apache.org/jira/browse/SPARK-45438 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0, 3.2.1, 3.3.0, 3.2.2, 3.3.1, 3.2.3, 3.2.4, 3.3.3, > 3.3.2, 3.4.0, 3.4.1, 3.5.0 >Reporter: Navin Kumar >Priority: Major > Labels: scala > > When submitting an application to Spark built with Scala 2.13, there are > issues with Decimal overflow that show up when using unary minus (and also > {{abs()}} which uses unary minus under the hood. > Here is an example PySpark reproduce use case: > {code} > from decimal import Decimal > from pyspark.sql import SparkSession > from pyspark.sql.types import StructType,StructField, DecimalType > spark = SparkSession.builder \ > .master("local[*]") \ > .appName("decimal_precision") \ > .config("spark.rapids.sql.explain", "ALL") \ > .config("spark.sql.ansi.enabled", "true") \ > .config("spark.sql.legacy.allowNegativeScaleOfDecimal", 'true') \ > .getOrCreate() > precision = 38 > scale = 0 > DECIMAL_MIN = Decimal('-' + ('9' * precision) + 'e' + str(-scale)) > data = [[DECIMAL_MIN]] > schema = StructType([ > StructField("a", DecimalType(precision, scale), True)]) > df = spark.createDataFrame(data=data, schema=schema) > df.selectExpr("a", "-a").show() > {code} > This particular example will run successfully on Spark built with Scala 2.12, > but throw a java.math.ArithmeticException on Spark built with Scala 2.13. > If you change the value of {{DECIMAL_MIN}} in the previous code to something > just ahead of the original DECIMAL_MIN, you will not get an exception thrown, > but instead you will get an incorrect answer (possibly due to overflow): > {code} > ... > DECIMAL_MIN = Decimal('-8' + ('9' * (precision-1)) + 'e' + str(-scale)) > ... > {code} > Output: > {code} > +++ > | a| (- a)| > +++ > |-8999...|9...| > +++ > {code} > It looks like the code in {{Decimal.scala}} uses {{scala.math.BigDecimal}}. > See https://github.com/scala/bug/issues/11590 with updates on how Scala 2.13 > handles BigDecimal. It looks like there is {{java.math.MathContext}} missing > when performing these operations. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45438) Decimal precision exceeds max precision error when using unary minus on min Decimal values
Navin Kumar created SPARK-45438: --- Summary: Decimal precision exceeds max precision error when using unary minus on min Decimal values Key: SPARK-45438 URL: https://issues.apache.org/jira/browse/SPARK-45438 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.0, 3.4.1, 3.4.0, 3.3.2, 3.3.3, 3.2.4, 3.2.3, 3.3.1, 3.2.2, 3.3.0, 3.2.1, 3.2.0 Reporter: Navin Kumar When submitting an application to Spark built with Scala 2.13, there are issues with Decimal overflow that show up when using unary minus (and also {{abs()}} which uses unary minus under the hood. Here is an example PySpark reproduce use case: {code} from decimal import Decimal from pyspark.sql import SparkSession from pyspark.sql.types import StructType,StructField, DecimalType spark = SparkSession.builder \ .master("local[*]") \ .appName("decimal_precision") \ .config("spark.rapids.sql.explain", "ALL") \ .config("spark.sql.ansi.enabled", "true") \ .config("spark.sql.legacy.allowNegativeScaleOfDecimal", 'true') \ .getOrCreate() precision = 38 scale = 0 DECIMAL_MIN = Decimal('-' + ('9' * precision) + 'e' + str(-scale)) data = [[DECIMAL_MIN]] schema = StructType([ StructField("a", DecimalType(precision, scale), True)]) df = spark.createDataFrame(data=data, schema=schema) df.selectExpr("a", "-a").show() {code} This particular example will run successfully on Spark built with Scala 2.12, but throw a java.math.ArithmeticException on Spark built with Scala 2.13. If you change the value of {{DECIMAL_MIN}} in the previous code to something just ahead of the original DECIMAL_MIN, you will not get an exception thrown, but instead you will get an incorrect answer (possibly due to overflow): {code} ... DECIMAL_MIN = Decimal('-8' + ('9' * (precision-1)) + 'e' + str(-scale)) ... {code} Output: {code} +++ | a| (- a)| +++ |-8999...|9...| +++ {code} It looks like the code in {{Decimal.scala}} uses {{scala.math.BigDecimal}}. See https://github.com/scala/bug/issues/11590 with updates on how Scala 2.13 handles BigDecimal. It looks like there is {{java.math.MathContext}} missing when performing these operations. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45426) [CORE] Add support for a ReloadingTrustManager
[ https://issues.apache.org/jira/browse/SPARK-45426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-45426: --- Labels: pull-request-available (was: ) > [CORE] Add support for a ReloadingTrustManager > -- > > Key: SPARK-45426 > URL: https://issues.apache.org/jira/browse/SPARK-45426 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Hasnain Lakhani >Priority: Major > Labels: pull-request-available > > For the RPC SSL feature, this allows us to properly reload the trust store if > needed at runtime. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44639) Add option to use Java tmp dir for RocksDB state store
[ https://issues.apache.org/jira/browse/SPARK-44639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-44639: --- Labels: pull-request-available (was: ) > Add option to use Java tmp dir for RocksDB state store > -- > > Key: SPARK-44639 > URL: https://issues.apache.org/jira/browse/SPARK-44639 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 3.4.1 >Reporter: Adam Binford >Priority: Major > Labels: pull-request-available > > Currently local RocksDB state is stored in a local directory given by > Utils.getLocalDir. On yarn this is a directory created inside the root > application folder such as > {{/tmp/nm-local-dir/usercache//appcache//}} > The problem with this is that if an executor crashes for some reason (like > OOM) and the shutdown hooks don't get run, this directory will stay around > forever until the application finishes, which can cause jobs to slowly > accumulate more and more temporary space until finally the node manager goes > unhealthy. > Because this data will only ever be accessed by the executor that created > this directory, it would make sense to store the data inside the container > folder, which will always get cleaned up by the node manager when that yarn > container gets cleaned up. Yarn sets the `java.io.tmpdir` to a path inside > this directory, such as > {{/tmp/nm-local-dir/usercache//appcache///tmp/}} > I'm not sure the behavior for other resource managers, so this could be an > opt-in config that can be specified. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45437) Upgrade SNAPPY to 1.1.10.5 to pick up fix re Linux PLE64
N Campbell created SPARK-45437: -- Summary: Upgrade SNAPPY to 1.1.10.5 to pick up fix re Linux PLE64 Key: SPARK-45437 URL: https://issues.apache.org/jira/browse/SPARK-45437 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.5.0 Reporter: N Campbell SPARK-45323 move to Snappy 1.1.10.4 and is proposing to add to SPARK 3.5.1 Snappy prior to 1.1.10.5 will not work on Linux PLE 64. Moving to Snappy 1.1.10.5 will address that issue https://github.com/xerial/snappy-java/pull/515 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40513) SPIP: Support Docker Official Image for Spark
[ https://issues.apache.org/jira/browse/SPARK-40513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-40513: --- Labels: SPIP pull-request-available (was: SPIP) > SPIP: Support Docker Official Image for Spark > - > > Key: SPARK-40513 > URL: https://issues.apache.org/jira/browse/SPARK-40513 > Project: Spark > Issue Type: New Feature > Components: Kubernetes, Spark Docker >Affects Versions: 3.5.0 >Reporter: Yikun Jiang >Assignee: Yikun Jiang >Priority: Major > Labels: SPIP, pull-request-available > Fix For: 3.5.0 > > > This SPIP is proposed to add [Docker Official > Image(DOI)|https://github.com/docker-library/official-images] to ensure the > Spark Docker images meet the quality standards for Docker images, to provide > these Docker images for users who want to use Apache Spark via Docker image. > There are also several [Apache projects that release the Docker Official > Images|https://hub.docker.com/search?q=apache_filter=official], such > as: [flink|https://hub.docker.com/_/flink], > [storm|https://hub.docker.com/_/storm], [solr|https://hub.docker.com/_/solr], > [zookeeper|https://hub.docker.com/_/zookeeper], > [httpd|https://hub.docker.com/_/httpd] (with 50M+ to 1B+ download for each). > From the huge download statistics, we can see the real demands of users, and > from the support of other apache projects, we should also be able to do it. > After support: > * The Dockerfile will still be maintained by the Apache Spark community and > reviewed by Docker. > * The images will be maintained by the Docker community to ensure the > quality standards for Docker images of the Docker community. > It will also reduce the extra docker images maintenance effort (such as > frequently rebuilding, image security update) of the Apache Spark community. > > SPIP DOC: > [https://docs.google.com/document/d/1nN-pKuvt-amUcrkTvYAQ-bJBgtsWb9nAkNoVNRM2S2o] > DISCUSS: [https://lists.apache.org/thread/l1793y5224n8bqkp3s6ltgkykso4htb3] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45435) Document that lazy checkpoint may not be a consistent
[ https://issues.apache.org/jira/browse/SPARK-45435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juliusz Sompolski updated SPARK-45435: -- Summary: Document that lazy checkpoint may not be a consistent (was: Document that lazy checkpoint may cause undeterministm) > Document that lazy checkpoint may not be a consistent > - > > Key: SPARK-45435 > URL: https://issues.apache.org/jira/browse/SPARK-45435 > Project: Spark > Issue Type: Documentation > Components: Spark Core, SQL >Affects Versions: 4.0.0 >Reporter: Juliusz Sompolski >Priority: Major > Labels: pull-request-available > > Some people may want to use checkpoint to get a consistent snapshot of the > Dataset / RDD. Warn that this is not the case with lazy checkpoint, because > checkpoint is computed only at the end of the first action, and the data used > during the first action may be different because of non-determinism and > retries. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45436) DataFrame methods check same session
[ https://issues.apache.org/jira/browse/SPARK-45436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-45436: --- Labels: pull-request-available (was: ) > DataFrame methods check same session > > > Key: SPARK-45436 > URL: https://issues.apache.org/jira/browse/SPARK-45436 > Project: Spark > Issue Type: Improvement > Components: Connect, PySpark >Affects Versions: 4.0.0 >Reporter: Ruifeng Zheng >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45435) Document that lazy checkpoint may cause undeterministm
[ https://issues.apache.org/jira/browse/SPARK-45435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-45435: --- Labels: pull-request-available (was: ) > Document that lazy checkpoint may cause undeterministm > -- > > Key: SPARK-45435 > URL: https://issues.apache.org/jira/browse/SPARK-45435 > Project: Spark > Issue Type: Documentation > Components: Spark Core, SQL >Affects Versions: 4.0.0 >Reporter: Juliusz Sompolski >Priority: Major > Labels: pull-request-available > > Some people may want to use checkpoint to get a consistent snapshot of the > Dataset / RDD. Warn that this is not the case with lazy checkpoint, because > checkpoint is computed only at the end of the first action, and the data used > during the first action may be different because of non-determinism and > retries. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45436) DataFrame methods check same session
Ruifeng Zheng created SPARK-45436: - Summary: DataFrame methods check same session Key: SPARK-45436 URL: https://issues.apache.org/jira/browse/SPARK-45436 Project: Spark Issue Type: Improvement Components: Connect, PySpark Affects Versions: 4.0.0 Reporter: Ruifeng Zheng -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45435) Document that lazy checkpoint may cause undeterministm
Juliusz Sompolski created SPARK-45435: - Summary: Document that lazy checkpoint may cause undeterministm Key: SPARK-45435 URL: https://issues.apache.org/jira/browse/SPARK-45435 Project: Spark Issue Type: Documentation Components: Spark Core, SQL Affects Versions: 4.0.0 Reporter: Juliusz Sompolski Some people may want to use checkpoint to get a consistent snapshot of the Dataset / RDD. Warn that this is not the case with lazy checkpoint, because checkpoint is computed only at the end of the first action, and the data used during the first action may be different because of non-determinism and retries. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-45434) LogisticRegression checks the training labels
[ https://issues.apache.org/jira/browse/SPARK-45434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruifeng Zheng resolved SPARK-45434. --- Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 43246 [https://github.com/apache/spark/pull/43246] > LogisticRegression checks the training labels > - > > Key: SPARK-45434 > URL: https://issues.apache.org/jira/browse/SPARK-45434 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 4.0.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-45434) LogisticRegression checks the training labels
[ https://issues.apache.org/jira/browse/SPARK-45434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruifeng Zheng reassigned SPARK-45434: - Assignee: Ruifeng Zheng > LogisticRegression checks the training labels > - > > Key: SPARK-45434 > URL: https://issues.apache.org/jira/browse/SPARK-45434 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 4.0.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-45433) CSV/JSON schema inference when timestamps do not match specified timestampFormat with only one row on each partition report error
[ https://issues.apache.org/jira/browse/SPARK-45433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot reassigned SPARK-45433: -- Assignee: Apache Spark > CSV/JSON schema inference when timestamps do not match specified > timestampFormat with only one row on each partition report error > - > > Key: SPARK-45433 > URL: https://issues.apache.org/jira/browse/SPARK-45433 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0, 3.4.0, 3.5.0 >Reporter: Jia Fan >Assignee: Apache Spark >Priority: Major > Labels: pull-request-available > > CSV/JSON schema inference when timestamps do not match specified > timestampFormat with `only one row on each partition` report error. > {code:java} > //eg > val csv = spark.read.option("timestampFormat", "-MM-dd'T'HH:mm:ss") > .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS()) > csv.show() {code} > {code:java} > //error > Caused by: java.time.format.DateTimeParseException: Text > '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index > 19 {code} > This bug affect 3.3/3.4/3.5. Unlike > https://issues.apache.org/jira/browse/SPARK-45424 , this is a different bug > but has the same error message -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-45433) CSV/JSON schema inference when timestamps do not match specified timestampFormat with only one row on each partition report error
[ https://issues.apache.org/jira/browse/SPARK-45433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot reassigned SPARK-45433: -- Assignee: (was: Apache Spark) > CSV/JSON schema inference when timestamps do not match specified > timestampFormat with only one row on each partition report error > - > > Key: SPARK-45433 > URL: https://issues.apache.org/jira/browse/SPARK-45433 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0, 3.4.0, 3.5.0 >Reporter: Jia Fan >Priority: Major > Labels: pull-request-available > > CSV/JSON schema inference when timestamps do not match specified > timestampFormat with `only one row on each partition` report error. > {code:java} > //eg > val csv = spark.read.option("timestampFormat", "-MM-dd'T'HH:mm:ss") > .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS()) > csv.show() {code} > {code:java} > //error > Caused by: java.time.format.DateTimeParseException: Text > '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index > 19 {code} > This bug affect 3.3/3.4/3.5. Unlike > https://issues.apache.org/jira/browse/SPARK-45424 , this is a different bug > but has the same error message -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-45433) CSV/JSON schema inference when timestamps do not match specified timestampFormat with only one row on each partition report error
[ https://issues.apache.org/jira/browse/SPARK-45433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot reassigned SPARK-45433: -- Assignee: (was: Apache Spark) > CSV/JSON schema inference when timestamps do not match specified > timestampFormat with only one row on each partition report error > - > > Key: SPARK-45433 > URL: https://issues.apache.org/jira/browse/SPARK-45433 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0, 3.4.0, 3.5.0 >Reporter: Jia Fan >Priority: Major > Labels: pull-request-available > > CSV/JSON schema inference when timestamps do not match specified > timestampFormat with `only one row on each partition` report error. > {code:java} > //eg > val csv = spark.read.option("timestampFormat", "-MM-dd'T'HH:mm:ss") > .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS()) > csv.show() {code} > {code:java} > //error > Caused by: java.time.format.DateTimeParseException: Text > '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index > 19 {code} > This bug affect 3.3/3.4/3.5. Unlike > https://issues.apache.org/jira/browse/SPARK-45424 , this is a different bug > but has the same error message -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-45433) CSV/JSON schema inference when timestamps do not match specified timestampFormat with only one row on each partition report error
[ https://issues.apache.org/jira/browse/SPARK-45433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot reassigned SPARK-45433: -- Assignee: Apache Spark > CSV/JSON schema inference when timestamps do not match specified > timestampFormat with only one row on each partition report error > - > > Key: SPARK-45433 > URL: https://issues.apache.org/jira/browse/SPARK-45433 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0, 3.4.0, 3.5.0 >Reporter: Jia Fan >Assignee: Apache Spark >Priority: Major > Labels: pull-request-available > > CSV/JSON schema inference when timestamps do not match specified > timestampFormat with `only one row on each partition` report error. > {code:java} > //eg > val csv = spark.read.option("timestampFormat", "-MM-dd'T'HH:mm:ss") > .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS()) > csv.show() {code} > {code:java} > //error > Caused by: java.time.format.DateTimeParseException: Text > '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index > 19 {code} > This bug affect 3.3/3.4/3.5. Unlike > https://issues.apache.org/jira/browse/SPARK-45424 , this is a different bug > but has the same error message -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45434) LogisticRegression checks the training labels
[ https://issues.apache.org/jira/browse/SPARK-45434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated SPARK-45434: --- Labels: pull-request-available (was: ) > LogisticRegression checks the training labels > - > > Key: SPARK-45434 > URL: https://issues.apache.org/jira/browse/SPARK-45434 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 4.0.0 >Reporter: Ruifeng Zheng >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45434) Validate the training labels
Ruifeng Zheng created SPARK-45434: - Summary: Validate the training labels Key: SPARK-45434 URL: https://issues.apache.org/jira/browse/SPARK-45434 Project: Spark Issue Type: Improvement Components: ML Affects Versions: 4.0.0 Reporter: Ruifeng Zheng -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45434) LogisticRegression checks the training labels
[ https://issues.apache.org/jira/browse/SPARK-45434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruifeng Zheng updated SPARK-45434: -- Summary: LogisticRegression checks the training labels (was: LogisticRegression check the training labels) > LogisticRegression checks the training labels > - > > Key: SPARK-45434 > URL: https://issues.apache.org/jira/browse/SPARK-45434 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 4.0.0 >Reporter: Ruifeng Zheng >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45434) LogisticRegression check the training labels
[ https://issues.apache.org/jira/browse/SPARK-45434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruifeng Zheng updated SPARK-45434: -- Summary: LogisticRegression check the training labels (was: Validate the training labels) > LogisticRegression check the training labels > > > Key: SPARK-45434 > URL: https://issues.apache.org/jira/browse/SPARK-45434 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 4.0.0 >Reporter: Ruifeng Zheng >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-45432) Remove deprecated Hadoop-2 `LocatedFileStatus` constructor
[ https://issues.apache.org/jira/browse/SPARK-45432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-45432: - Assignee: Dongjoon Hyun > Remove deprecated Hadoop-2 `LocatedFileStatus` constructor > -- > > Key: SPARK-45432 > URL: https://issues.apache.org/jira/browse/SPARK-45432 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-45432) Remove deprecated Hadoop-2 `LocatedFileStatus` constructor
[ https://issues.apache.org/jira/browse/SPARK-45432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-45432. --- Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 43239 [https://github.com/apache/spark/pull/43239] > Remove deprecated Hadoop-2 `LocatedFileStatus` constructor > -- > > Key: SPARK-45432 > URL: https://issues.apache.org/jira/browse/SPARK-45432 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-45357) Maven test `SparkConnectProtoSuite` failed
[ https://issues.apache.org/jira/browse/SPARK-45357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruifeng Zheng resolved SPARK-45357. --- Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 43155 [https://github.com/apache/spark/pull/43155] > Maven test `SparkConnectProtoSuite` failed > -- > > Key: SPARK-45357 > URL: https://issues.apache.org/jira/browse/SPARK-45357 > Project: Spark > Issue Type: Bug > Components: Connect >Affects Versions: 4.0.0 >Reporter: Yang Jie >Assignee: Yang Jie >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > > > build/mvn clean install -pl connector/connect/server -am -DskipTests > mvn test -pl connector/connect/server > > {code:java} > - Test observe *** FAILED *** > == FAIL: Plans do not match === > !CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, > sum(id#0) AS sum(id)#0L], 0 CollectMetrics my_metric, [min(id#0) AS > min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 53 > +- LocalRelation , [id#0, name#0] > +- LocalRelation , [id#0, name#0] > (PlanTest.scala:179) {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-45357) Maven test `SparkConnectProtoSuite` failed
[ https://issues.apache.org/jira/browse/SPARK-45357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruifeng Zheng reassigned SPARK-45357: - Assignee: Yang Jie > Maven test `SparkConnectProtoSuite` failed > -- > > Key: SPARK-45357 > URL: https://issues.apache.org/jira/browse/SPARK-45357 > Project: Spark > Issue Type: Bug > Components: Connect >Affects Versions: 4.0.0 >Reporter: Yang Jie >Assignee: Yang Jie >Priority: Major > Labels: pull-request-available > > > build/mvn clean install -pl connector/connect/server -am -DskipTests > mvn test -pl connector/connect/server > > {code:java} > - Test observe *** FAILED *** > == FAIL: Plans do not match === > !CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, > sum(id#0) AS sum(id)#0L], 0 CollectMetrics my_metric, [min(id#0) AS > min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 53 > +- LocalRelation , [id#0, name#0] > +- LocalRelation , [id#0, name#0] > (PlanTest.scala:179) {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org