[jira] [Updated] (SPARK-44649) Runtime Filter supports passing equivalent creation side expressions

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-44649:
---
Labels: pull-request-available  (was: )

> Runtime Filter supports passing equivalent creation side expressions
> 
>
> Key: SPARK-44649
> URL: https://issues.apache.org/jira/browse/SPARK-44649
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Jiaan Geng
>Priority: Major
>  Labels: pull-request-available
>
> {code:java}
> SELECT
>   d_year,
>   i_brand_id,
>   i_class_id,
>   i_category_id,
>   i_manufact_id,
>   cs_quantity - COALESCE(cr_return_quantity, 0) AS sales_cnt,
>   cs_ext_sales_price - COALESCE(cr_return_amount, 0.0) AS sales_amt
> FROM catalog_sales
>   JOIN item ON i_item_sk = cs_item_sk
>   JOIN date_dim ON d_date_sk = cs_sold_date_sk
>   LEFT JOIN catalog_returns ON (cs_order_number = cr_order_number
> AND cs_item_sk = cr_item_sk)
> WHERE i_category = 'Books'
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45449) Cache Invalidation Issue with JDBC Table

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45449:
---
Labels: pull-request-available  (was: )

> Cache Invalidation Issue with JDBC Table
> 
>
> Key: SPARK-45449
> URL: https://issues.apache.org/jira/browse/SPARK-45449
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: liangyongyuan
>Priority: Major
>  Labels: pull-request-available
>
> We have identified a cache invalidation issue when caching JDBC tables in 
> Spark SQL. The cached table is unexpectedly invalidated when queried, leading 
> to a re-read from the JDBC table instead of retrieving data from the cache.
> Example SQL:
> {code:java}
> CACHE TABLE cache_t SELECT * FROM mysql.test.test1;
> SELECT * FROM cache_t;
> {code}
> Expected Behavior:
> The expectation is that querying the cached table (cache_t) should retrieve 
> the result from the cache without re-evaluating the execution plan.
> Actual Behavior:
> However, the cache is invalidated, and the content is re-read from the JDBC 
> table.
> Root Cause:
> The issue lies in the 'CacheData' class, where the comparison involves 
> 'JDBCTable.' The 'JDBCTable' is a case class:
> {code:java}
> case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: 
> JDBCOptions)
> {code}
> The comparison of non-case class components, such as 'jdbcOptions,' involves 
> pointer comparison. This leads to unnecessary cache invalidation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45449) Cache Invalidation Issue with JDBC Table

2023-10-06 Thread liangyongyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liangyongyuan updated SPARK-45449:
--
Summary: Cache Invalidation Issue with JDBC Table  (was: Cache Invalidation 
Issue with JDBC Table in Spark SQL)

> Cache Invalidation Issue with JDBC Table
> 
>
> Key: SPARK-45449
> URL: https://issues.apache.org/jira/browse/SPARK-45449
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: liangyongyuan
>Priority: Major
>
> We have identified a cache invalidation issue when caching JDBC tables in 
> Spark SQL. The cached table is unexpectedly invalidated when queried, leading 
> to a re-read from the JDBC table instead of retrieving data from the cache.
> Example SQL:
> {code:java}
> CACHE TABLE cache_t SELECT * FROM mysql.test.test1;
> SELECT * FROM cache_t;
> {code}
> Expected Behavior:
> The expectation is that querying the cached table (cache_t) should retrieve 
> the result from the cache without re-evaluating the execution plan.
> Actual Behavior:
> However, the cache is invalidated, and the content is re-read from the JDBC 
> table.
> Root Cause:
> The issue lies in the 'CacheData' class, where the comparison involves 
> 'JDBCTable.' The 'JDBCTable' is a case class:
> {code:java}
> case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: 
> JDBCOptions)
> {code}
> The comparison of non-case class components, such as 'jdbcOptions,' involves 
> pointer comparison. This leads to unnecessary cache invalidation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45449) Cache Invalidation Issue with JDBC Table in Spark SQL

2023-10-06 Thread liangyongyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liangyongyuan updated SPARK-45449:
--
Summary: Cache Invalidation Issue with JDBC Table in Spark SQL  (was: 
Unable to cache reading jdbc data)

> Cache Invalidation Issue with JDBC Table in Spark SQL
> -
>
> Key: SPARK-45449
> URL: https://issues.apache.org/jira/browse/SPARK-45449
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: liangyongyuan
>Priority: Major
>
> We have identified a cache invalidation issue when caching JDBC tables in 
> Spark SQL. The cached table is unexpectedly invalidated when queried, leading 
> to a re-read from the JDBC table instead of retrieving data from the cache.
> Example SQL:
> {code:java}
> CACHE TABLE cache_t SELECT * FROM mysql.test.test1;
> SELECT * FROM cache_t;
> {code}
> Expected Behavior:
> The expectation is that querying the cached table (cache_t) should retrieve 
> the result from the cache without re-evaluating the execution plan.
> Actual Behavior:
> However, the cache is invalidated, and the content is re-read from the JDBC 
> table.
> Root Cause:
> The issue lies in the 'CacheData' class, where the comparison involves 
> 'JDBCTable.' The 'JDBCTable' is a case class:
> {code:java}
> case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: 
> JDBCOptions)
> {code}
> The comparison of non-case class components, such as 'jdbcOptions,' involves 
> pointer comparison. This leads to unnecessary cache invalidation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45450) Fix imports according to PEP8: pyspark.pandas and pyspark (core)

2023-10-06 Thread Hyukjin Kwon (Jira)
Hyukjin Kwon created SPARK-45450:


 Summary: Fix imports according to PEP8: pyspark.pandas and pyspark 
(core)
 Key: SPARK-45450
 URL: https://issues.apache.org/jira/browse/SPARK-45450
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 4.0.0
Reporter: Hyukjin Kwon


https://peps.python.org/pep-0008/#imports



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45449) Unable to cache reading jdbc data

2023-10-06 Thread liangyongyuan (Jira)
liangyongyuan created SPARK-45449:
-

 Summary: Unable to cache reading jdbc data
 Key: SPARK-45449
 URL: https://issues.apache.org/jira/browse/SPARK-45449
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.5.0
Reporter: liangyongyuan


We have identified a cache invalidation issue when caching JDBC tables in Spark 
SQL. The cached table is unexpectedly invalidated when queried, leading to a 
re-read from the JDBC table instead of retrieving data from the cache.
Example SQL:

{code:java}
CACHE TABLE cache_t SELECT * FROM mysql.test.test1;
SELECT * FROM cache_t;
{code}

Expected Behavior:
The expectation is that querying the cached table (cache_t) should retrieve the 
result from the cache without re-evaluating the execution plan.

Actual Behavior:
However, the cache is invalidated, and the content is re-read from the JDBC 
table.

Root Cause:
The issue lies in the 'CacheData' class, where the comparison involves 
'JDBCTable.' The 'JDBCTable' is a case class:
{code:java}
case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: 
JDBCOptions)
{code}

The comparison of non-case class components, such as 'jdbcOptions,' involves 
pointer comparison. This leads to unnecessary cache invalidation.






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45448) Fix imports according to PEP8: pyspark.testing, pyspark.mllib, pyspark.resource and pyspark.streaming

2023-10-06 Thread Hyukjin Kwon (Jira)
Hyukjin Kwon created SPARK-45448:


 Summary: Fix imports according to PEP8: pyspark.testing, 
pyspark.mllib, pyspark.resource and pyspark.streaming
 Key: SPARK-45448
 URL: https://issues.apache.org/jira/browse/SPARK-45448
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 4.0.0
Reporter: Hyukjin Kwon


https://peps.python.org/pep-0008/#imports



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45447) Reduce the memory required to start the LocalClusterSparkContext in the mllib module test cases. #43242

2023-10-06 Thread Yang Jie (Jira)
Yang Jie created SPARK-45447:


 Summary: Reduce the memory required to start the 
LocalClusterSparkContext in the mllib module test cases. #43242
 Key: SPARK-45447
 URL: https://issues.apache.org/jira/browse/SPARK-45447
 Project: Spark
  Issue Type: Improvement
  Components: MLlib, Tests
Affects Versions: 4.0.0
Reporter: Yang Jie






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45445) Upgrade snappy to 1.1.10.5

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45445:
---
Labels: pull-request-available  (was: )

> Upgrade snappy to 1.1.10.5
> --
>
> Key: SPARK-45445
> URL: https://issues.apache.org/jira/browse/SPARK-45445
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 4.0.0
>Reporter: BingKun Pan
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45445) Upgrade snappy to 1.1.10.5

2023-10-06 Thread BingKun Pan (Jira)
BingKun Pan created SPARK-45445:
---

 Summary: Upgrade snappy to 1.1.10.5
 Key: SPARK-45445
 URL: https://issues.apache.org/jira/browse/SPARK-45445
 Project: Spark
  Issue Type: Improvement
  Components: Build
Affects Versions: 4.0.0
Reporter: BingKun Pan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread XiDuo You (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772729#comment-17772729
 ] 

XiDuo You commented on SPARK-45443:
---

hi [~erenavsarogullari] , it seems that, it depends on the behavior of rdd 
cache. Say, what happens if we materialize a cached rdd twice at the same time 
? There are some race condition in block manager per rdd partition so it makes 
things slow. BTW, what's the behavior before we have TableCacheQueryStage ? 
Does not it have this issue ?

> Revisit TableCacheQueryStage to avoid replicated InMemoryRelation 
> materialization
> -
>
> Key: SPARK-45443
> URL: https://issues.apache.org/jira/browse/SPARK-45443
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Eren Avsarogullari
>Priority: Major
> Attachments: IMR Materialization - Stage 2.png, IMR Materialization - 
> Stage 3.png
>
>
> TableCacheQueryStage is created per InMemoryTableScanExec by 
> AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output 
> (cached RDD) to provide runtime stats in order to apply AQE  optimizations 
> into remaining physical plan stages. TableCacheQueryStage materializes 
> InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
> instance. For example, if there are 2 TableCacheQueryStage instances 
> referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s 
> materialization takes longer, following logic will return false 
> (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
> materialization. This behavior can be more visible when cached RDD size is 
> high.
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]
> Would like to get community feedback. Thanks in advance.
> cc [~ulysses] [~cloud_fan]
> *Sample Query to simulate the problem:*
> // Both join legs uses same IMR instance
> {code:java}
> import spark.implicits._
> val arr = (1 to 12).map { i => {
> val index = i % 5
> (index, s"Employee_$index", s"Department_$index")
>   }
> }
> val df = arr.toDF("id", "name", "department")
>   .filter('id >= 0)
>   .sort("id")
>   .groupBy('id, 'name, 'department)
>   .count().as("count")
> df.persist()
> val df2 = df.sort("count").filter('count <= 2)
> val df3 = df.sort("count").filter('count >= 3)
> val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")
> df4.show() {code}
> *Physical Plan:*
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (31)
> +- == Final Plan ==
>    CollectLimit (21)
>    +- * Project (20)
>       +- * SortMergeJoin FullOuter (19)
>          :- * Sort (10)
>          :  +- * Filter (9)
>          :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>          :        +- InMemoryTableScan (1)
>          :              +- InMemoryRelation (2)
>          :                    +- AdaptiveSparkPlan (7)
>          :                       +- HashAggregate (6)
>          :                          +- Exchange (5)
>          :                             +- HashAggregate (4)
>          :                                +- LocalTableScan (3)
>          +- * Sort (18)
>             +- * Filter (17)
>                +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>                   +- InMemoryTableScan (11)
>                         +- InMemoryRelation (12)
>                               +- AdaptiveSparkPlan (15)
>                                  +- HashAggregate (14)
>                                     +- Exchange (13)
>                                        +- HashAggregate (4)
>                                           +- LocalTableScan (3) {code}
> *Stages DAGs materializing the same IMR instance:*
> !IMR Materialization - Stage 2.png|width=303,height=134!
> !IMR Materialization - Stage 3.png|width=303,height=134!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42309) Assign name to _LEGACY_ERROR_TEMP_1204

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-42309:
---
Labels: pull-request-available  (was: )

> Assign name to _LEGACY_ERROR_TEMP_1204
> --
>
> Key: SPARK-42309
> URL: https://issues.apache.org/jira/browse/SPARK-42309
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Haejoon Lee
>Assignee: Haejoon Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45444) Upgrade `commons-io` to 1.24.0

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45444:
---
Labels: pull-request-available  (was: )

> Upgrade `commons-io` to 1.24.0
> --
>
> Key: SPARK-45444
> URL: https://issues.apache.org/jira/browse/SPARK-45444
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 4.0.0
>Reporter: BingKun Pan
>Priority: Trivial
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45444) Upgrade `commons-io` to 1.24.0

2023-10-06 Thread BingKun Pan (Jira)
BingKun Pan created SPARK-45444:
---

 Summary: Upgrade `commons-io` to 1.24.0
 Key: SPARK-45444
 URL: https://issues.apache.org/jira/browse/SPARK-45444
 Project: Spark
  Issue Type: Improvement
  Components: Build
Affects Versions: 4.0.0
Reporter: BingKun Pan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45442) Refine docstring of `DataFrame.show`

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45442:
---
Labels: pull-request-available  (was: )

> Refine docstring of `DataFrame.show`
> 
>
> Key: SPARK-45442
> URL: https://issues.apache.org/jira/browse/SPARK-45442
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, PySpark
>Affects Versions: 4.0.0
>Reporter: Allison Wang
>Priority: Major
>  Labels: pull-request-available
>
> Refine docstring of `DataFrame.show()`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats in order to apply AQE  optimizations into 
remaining physical plan stages. TableCacheQueryStage materializes 
InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
instance. For example, if there are 2 TableCacheQueryStage instances 
referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s 
materialization takes longer, following logic will return false 
(inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
materialization. This behavior can be more visible when cached RDD size is high.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
*Stages DAGs materializing the same IMR instance:*
!IMR Materialization - Stage 2.png|width=303,height=134!
!IMR Materialization - Stage 3.png|width=303,height=134!

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats in order to apply AQE  optimizations onto 
remaining physical plan stages. TableCacheQueryStage materializes 
InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
instance. For example, if there are 2 TableCacheQueryStage instances 
referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s 
materialization takes longer, following logic will return false 
(inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
materialization. This behavior can be more visible when cached RDD size is high.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :       

[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats in order to apply AQE  optimizations onto 
remaining physical plan stages. TableCacheQueryStage materializes 
InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
instance. For example, if there are 2 TableCacheQueryStage instances 
referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s 
materialization takes longer, following logic will return false 
(inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
materialization. This behavior can be more visible when cached RDD size is high.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
*Stages DAGs materializing the same IMR instance:*
!IMR Materialization - Stage 2.png|width=303,height=134!
!IMR Materialization - Stage 3.png|width=303,height=134!

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance. For example, if 
there are 2 TableCacheQueryStage instances referencing same IMR instance 
(cached RDD) and first InMemoryTableScanExec' s materialization takes longer, 
following logic will return false (inMemoryTableScan.isMaterialized => false) 
and this may cause replicated IMR materialization. This behavior can be more 
visible when cached RDD size is high.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- 

[jira] [Updated] (SPARK-42775) approx_percentile produces wrong results for large decimals.

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-42775:
---
Labels: pull-request-available  (was: )

> approx_percentile produces wrong results for large decimals.
> 
>
> Key: SPARK-42775
> URL: https://issues.apache.org/jira/browse/SPARK-42775
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0, 2.2.0, 2.3.0, 2.4.0, 3.0.0, 3.1.0, 3.2.0, 3.3.0, 
> 3.4.0
>Reporter: Chenhao Li
>Priority: Major
>  Labels: pull-request-available
>
> In the {{approx_percentile}} expression, Spark casts decimal to double to 
> update the aggregation state 
> ([ApproximatePercentile.scala#L181|https://github.com/apache/spark/blob/933dc0c42f0caf74aaa077fd4f2c2e7208452b9b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala#L181])
>  and casts the result double back to decimal 
> ([ApproximatePercentile.scala#L206|https://github.com/apache/spark/blob/933dc0c42f0caf74aaa077fd4f2c2e7208452b9b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala#L206]).
>  The precision loss in the casts can make the result decimal out of its 
> precision range. This can lead to the following counter-intuitive results:
> {code:sql}
> spark-sql> select approx_percentile(col, 0.5) from values 
> (999) as tab(col);
> NULL
> spark-sql> select approx_percentile(col, 0.5) is null from values 
> (999) as tab(col);
> false
> spark-sql> select cast(approx_percentile(col, 0.5) as string) from values 
> (999) as tab(col);
> 1000
> spark-sql> desc select approx_percentile(col, 0.5) from values 
> (999) as tab(col);
> approx_percentile(col, 0.5, 1)decimal(19,0) 
> {code}
> The result is actually not null, so the second query returns false. The first 
> query returns null because the result cannot fit into {{{}decimal(19, 0){}}}.
> A suggested fix is to use {{Decimal.changePrecision}} here to ensure the 
> result fits, and really returns a null or throws an exception when the result 
> doesn't fit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-45440) Incorrect summary counts from a CSV file

2023-10-06 Thread Bruce Robbins (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772724#comment-17772724
 ] 

Bruce Robbins commented on SPARK-45440:
---

I added {{inferSchema=true}} as a datasource option in your example and I got 
the expected answer. Otherwise it's doing a max and min on a string (not a 
number).

> Incorrect summary counts from a CSV file
> 
>
> Key: SPARK-45440
> URL: https://issues.apache.org/jira/browse/SPARK-45440
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 3.5.0
> Environment: Pyspark version 3.5.0 
>Reporter: Evan Volgas
>Priority: Major
>  Labels: aggregation, bug, pyspark
>
> I am using pip-installed Pyspark version 3.5.0 inside the context of an 
> IPython shell. The task is straightforward: take [this CSV 
> file|https://gist.githubusercontent.com/evanvolgas/e5cb082673ec947239658291f2251de4/raw/a9c5e9866ac662a816f9f3828a2d184032f604f0/AAPL.csv]
>  of AAPL stock prices and compute the minimum and maximum volume weighted 
> average price for the entire file. 
> My code is [here. 
> |https://gist.github.com/evanvolgas/e4aa75fec4179bb7075a5283867f127c]I've 
> also performed the same computation in DuckDB because I noticed that the 
> results of the Spark code are wrong. 
> Literally, the exact same SQL in DuckDB and in Spark yield different results, 
> and Spark's are wrong. 
> I have never seen this behavior in a Spark release before. I'm very confused 
> by it, and curious if anyone else can replicate this behavior. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance. For example, if 
there are 2 TableCacheQueryStage instances referencing same IMR instance 
(cached RDD) and first InMemoryTableScanExec' s materialization takes longer, 
following logic will return false (inMemoryTableScan.isMaterialized => false) 
and this may cause replicated IMR materialization. This behavior can be more 
visible when cached RDD size is high.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
*Stages DAGs materializing the same IMR instance:*
!IMR Materialization - Stage 2.png|width=303,height=134!
!IMR Materialization - Stage 3.png|width=303,height=134!

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance. For example, if 
there are 2 TableCacheQueryStage instances referencing same IMR instance 
(cached RDD) and first InMemoryTableScanExec' s materialization takes longer, 
following logic will return false (inMemoryTableScan.isMaterialized => false) 
and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         

[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance. For example, if 
there are 2 TableCacheQueryStage instances referencing same IMR instance 
(cached RDD) and first InMemoryTableScanExec' s materialization takes longer, 
following logic will return false (inMemoryTableScan.isMaterialized => false) 
and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
*Stages DAGs materializing the same IMR instance:*
!IMR Materialization - Stage 2.png|width=303,height=134!
!IMR Materialization - Stage 3.png|width=303,height=134!

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance. For example, if 
there are 2 TableCacheQueryStage instances referencing same IMR instance 
(cached RDD) and first InMemoryTableScanExec' s materialization takes longer, 
following logic will return false (inMemoryTableScan.isMaterialized => false) 
and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :         

[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance. For example, if 
there are 2 TableCacheQueryStage instances referencing same IMR instance 
(cached RDD) and first InMemoryTableScanExec' s materialization takes longer, 
following logic will return false (inMemoryTableScan.isMaterialized => false) 
and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
*Replicated Stages DAGs:*
!IMR Materialization - Stage 2.png|width=303,height=134!
!IMR Materialization - Stage 3.png|width=303,height=134!

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance. For example, if 
there are 2 TableCacheQueryStage instances referencing same IMR instance 
(cached RDD) and first InMemoryTableScanExec' s materialization takes longer, 
following logic will return false (inMemoryTableScan.isMaterialized => false) 
and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :             

[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance. For example, if 
there are 2 TableCacheQueryStage instances referencing same IMR instance 
(cached RDD) and first InMemoryTableScanExec' s materialization takes longer, 
following logic will return false (inMemoryTableScan.isMaterialized => false) 
and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
*Replicated Stages DAGs:*
!IMR Materialization - Stage 2.png|width=303,height=134!
!IMR Materialization - Stage 3.png|width=303,height=134!

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR 
instance (cached RDD) and first InMemoryTableScanExec' s materialization takes 
longer, following logic will return false (inMemoryTableScan.isMaterialized => 
false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- 

[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR 
instance (cached RDD) and first InMemoryTableScanExec' s materialization takes 
longer, following logic will return false (inMemoryTableScan.isMaterialized => 
false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
*Replicated Stages DAGs:*
!IMR Materialization - Stage 2.png|width=303,height=134!
!IMR Materialization - Stage 3.png|width=303,height=134!

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR 
instance (cached RDD) and first InMemoryTableScanExec' s materialization takes 
longer, following logic will return false (inMemoryTableScan.isMaterialized => 
false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- 

[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR 
instance (cached RDD) and first InMemoryTableScanExec' s materialization takes 
longer, following logic will return false (inMemoryTableScan.isMaterialized => 
false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
Replicated Stages DAGs:
!IMR Materialization - Stage 2.png|width=303,height=134!
!IMR Materialization - Stage 3.png|width=303,height=134!

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR 
instance (cached RDD) and first InMemoryTableScanExec' s materialization takes 
longer, following logic will return false (inMemoryTableScan.isMaterialized => 
false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- 

[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR 
instance (cached RDD) and first InMemoryTableScanExec' s materialization takes 
longer, following logic will return false (inMemoryTableScan.isMaterialized => 
false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
Replicated Stages DAG:

!IMR Materialization - Stage 2.png|width=303,height=134!

!IMR Materialization - Stage 3.png|width=303,height=134!

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR 
instance (cached RDD) and first InMemoryTableScanExec' s materialization takes 
longer, following logic will return false (inMemoryTableScan.isMaterialized => 
false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- 

[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Attachment: IMR Materialization - Stage 3.png
IMR Materialization - Stage 2.png

> Revisit TableCacheQueryStage to avoid replicated InMemoryRelation 
> materialization
> -
>
> Key: SPARK-45443
> URL: https://issues.apache.org/jira/browse/SPARK-45443
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Eren Avsarogullari
>Priority: Major
> Attachments: IMR Materialization - Stage 2.png, IMR Materialization - 
> Stage 3.png
>
>
> TableCacheQueryStage is created per InMemoryTableScanExec by 
> AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output 
> (cached RDD) to provide runtime stats to apply AQE  optimizations onto 
> remaining physical plan stages. TableCacheQueryStage materializes 
> InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
> instance.
> For example, if there are 2 TableCacheQueryStage instances referencing same 
> IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization 
> takes longer, following logic will return false 
> (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
> materialization.
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]
> *Sample Query to simulate the problem:*
> // Both join legs uses same IMR instance
> {code:java}
> import spark.implicits._
> val arr = (1 to 12).map { i => {
> val index = i % 5
> (index, s"Employee_$index", s"Department_$index")
>   }
> }
> val df = arr.toDF("id", "name", "department")
>   .filter('id >= 0)
>   .sort("id")
>   .groupBy('id, 'name, 'department)
>   .count().as("count")
> df.persist()
> val df2 = df.sort("count").filter('count <= 2)
> val df3 = df.sort("count").filter('count >= 3)
> val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")
> df4.show() {code}
> *Physical Plan:*
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (31)
> +- == Final Plan ==
>    CollectLimit (21)
>    +- * Project (20)
>       +- * SortMergeJoin FullOuter (19)
>          :- * Sort (10)
>          :  +- * Filter (9)
>          :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>          :        +- InMemoryTableScan (1)
>          :              +- InMemoryRelation (2)
>          :                    +- AdaptiveSparkPlan (7)
>          :                       +- HashAggregate (6)
>          :                          +- Exchange (5)
>          :                             +- HashAggregate (4)
>          :                                +- LocalTableScan (3)
>          +- * Sort (18)
>             +- * Filter (17)
>                +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>                   +- InMemoryTableScan (11)
>                         +- InMemoryRelation (12)
>                               +- AdaptiveSparkPlan (15)
>                                  +- HashAggregate (14)
>                                     +- Exchange (13)
>                                        +- HashAggregate (4)
>                                           +- LocalTableScan (3) {code}
> Replicated Stages DAG:
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR 
instance (cached RDD) and first InMemoryTableScanExec' s materialization takes 
longer, following logic will return false (inMemoryTableScan.isMaterialized => 
false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
Replicated Stages DAG:

 

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR 
instance (cached RDD) and first InMemoryTableScanExec' s materialization takes 
longer, following logic will return false (inMemoryTableScan.isMaterialized => 
false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), 

[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization

2023-10-06 Thread Eren Avsarogullari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---
Summary: Revisit TableCacheQueryStage to avoid replicated InMemoryRelation 
materialization  (was: Revisit TableCacheQueryStage to avoid replicated IMR 
materialization)

> Revisit TableCacheQueryStage to avoid replicated InMemoryRelation 
> materialization
> -
>
> Key: SPARK-45443
> URL: https://issues.apache.org/jira/browse/SPARK-45443
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Eren Avsarogullari
>Priority: Major
>
> TableCacheQueryStage is created per InMemoryTableScanExec by 
> AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output 
> (cached RDD) to provide runtime stats to apply AQE  optimizations onto 
> remaining physical plan stages. TableCacheQueryStage materializes 
> InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
> instance.
> For example, if there are 2 TableCacheQueryStage instances referencing same 
> IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization 
> takes longer, following logic will return false 
> (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
> materialization.
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]
> *Sample Query to simulate the problem:*
> // Both join legs uses same IMR instance
> {code:java}
> import spark.implicits._
> val arr = (1 to 12).map { i => {
> val index = i % 5
> (index, s"Employee_$index", s"Department_$index")
>   }
> }
> val df = arr.toDF("id", "name", "department")
>   .filter('id >= 0)
>   .sort("id")
>   .groupBy('id, 'name, 'department)
>   .count().as("count")
> df.persist()
> val df2 = df.sort("count").filter('count <= 2)
> val df3 = df.sort("count").filter('count >= 3)
> val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")
> df4.show() {code}
> *Physical Plan:*
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (31)
> +- == Final Plan ==
>    CollectLimit (21)
>    +- * Project (20)
>       +- * SortMergeJoin FullOuter (19)
>          :- * Sort (10)
>          :  +- * Filter (9)
>          :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>          :        +- InMemoryTableScan (1)
>          :              +- InMemoryRelation (2)
>          :                    +- AdaptiveSparkPlan (7)
>          :                       +- HashAggregate (6)
>          :                          +- Exchange (5)
>          :                             +- HashAggregate (4)
>          :                                +- LocalTableScan (3)
>          +- * Sort (18)
>             +- * Filter (17)
>                +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>                   +- InMemoryTableScan (11)
>                         +- InMemoryRelation (12)
>                               +- AdaptiveSparkPlan (15)
>                                  +- HashAggregate (14)
>                                     +- Exchange (13)
>                                        +- HashAggregate (4)
>                                           +- LocalTableScan (3) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated IMR materialization

2023-10-06 Thread Eren Avsarogullari (Jira)
Eren Avsarogullari created SPARK-45443:
--

 Summary: Revisit TableCacheQueryStage to avoid replicated IMR 
materialization
 Key: SPARK-45443
 URL: https://issues.apache.org/jira/browse/SPARK-45443
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.5.0
Reporter: Eren Avsarogullari


TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance.
For example, if there are 2 TableCacheQueryStage instances referencing same IMR 
instance (cached RDD) and first InMemoryTableScanExec' s materialization takes 
longer, following logic will return false (inMemoryTableScan.isMaterialized => 
false) and this may cause replicated IMR materialization.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
val index = i % 5
(index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-45425) Mapped TINYINT to ShortType for MsSqlServerDialect

2023-10-06 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang resolved SPARK-45425.

Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 43232
[https://github.com/apache/spark/pull/43232]

> Mapped TINYINT to ShortType for MsSqlServerDialect
> --
>
> Key: SPARK-45425
> URL: https://issues.apache.org/jira/browse/SPARK-45425
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Zerui Bao
>Assignee: Zerui Bao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> TINYINT of SQL server is not correctly mapped to spark types when using JDBC 
> connector. For now, it is mapped to the IntegerType, which is not accurate. 
> It should be mapped to ShortType.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45425) Mapped TINYINT to ShortType for MsSqlServerDialect

2023-10-06 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang reassigned SPARK-45425:
--

Assignee: Zerui Bao

> Mapped TINYINT to ShortType for MsSqlServerDialect
> --
>
> Key: SPARK-45425
> URL: https://issues.apache.org/jira/browse/SPARK-45425
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Zerui Bao
>Assignee: Zerui Bao
>Priority: Major
>  Labels: pull-request-available
>
> TINYINT of SQL server is not correctly mapped to spark types when using JDBC 
> connector. For now, it is mapped to the IntegerType, which is not accurate. 
> It should be mapped to ShortType.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45442) Refine docstring of `DataFrame.show`

2023-10-06 Thread Allison Wang (Jira)
Allison Wang created SPARK-45442:


 Summary: Refine docstring of `DataFrame.show`
 Key: SPARK-45442
 URL: https://issues.apache.org/jira/browse/SPARK-45442
 Project: Spark
  Issue Type: Sub-task
  Components: Documentation, PySpark
Affects Versions: 4.0.0
Reporter: Allison Wang


Refine docstring of `DataFrame.show()`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45441) Introduce more util functions for PythonWorkerUtils

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45441:
---
Labels: pull-request-available  (was: )

> Introduce more util functions for PythonWorkerUtils
> ---
>
> Key: SPARK-45441
> URL: https://issues.apache.org/jira/browse/SPARK-45441
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 4.0.0
>Reporter: Takuya Ueshin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45441) Introduce more util functions for PythonWorkerUtils

2023-10-06 Thread Takuya Ueshin (Jira)
Takuya Ueshin created SPARK-45441:
-

 Summary: Introduce more util functions for PythonWorkerUtils
 Key: SPARK-45441
 URL: https://issues.apache.org/jira/browse/SPARK-45441
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 4.0.0
Reporter: Takuya Ueshin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45440) Incorrect summary counts from a CSV file

2023-10-06 Thread Evan Volgas (Jira)
Evan Volgas created SPARK-45440:
---

 Summary: Incorrect summary counts from a CSV file
 Key: SPARK-45440
 URL: https://issues.apache.org/jira/browse/SPARK-45440
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 3.5.0
 Environment: Pyspark version 3.5.0 
Reporter: Evan Volgas


I am using pip-installed Pyspark version 3.5.0 inside the context of an IPython 
shell. The task is straightforward: take [this CSV 
file|https://gist.githubusercontent.com/evanvolgas/e5cb082673ec947239658291f2251de4/raw/a9c5e9866ac662a816f9f3828a2d184032f604f0/AAPL.csv]
 of AAPL stock prices and compute the minimum and maximum volume weighted 
average price for the entire file. 

My code is [here. 
|https://gist.github.com/evanvolgas/e4aa75fec4179bb7075a5283867f127c]I've also 
performed the same computation in DuckDB because I noticed that the results of 
the Spark code are wrong. 

Literally, the exact same SQL in DuckDB and in Spark yield different results, 
and Spark's are wrong. 

I have never seen this behavior in a Spark release before. I'm very confused by 
it, and curious if anyone else can replicate this behavior. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45438) Decimal precision exceeds max precision error when using unary minus on min Decimal values on Scala 2.13 Spark

2023-10-06 Thread Navin Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navin Kumar updated SPARK-45438:

Summary: Decimal precision exceeds max precision error when using unary 
minus on min Decimal values on Scala 2.13 Spark  (was: Decimal precision 
exceeds max precision error when using unary minus on min Decimal values)

> Decimal precision exceeds max precision error when using unary minus on min 
> Decimal values on Scala 2.13 Spark
> --
>
> Key: SPARK-45438
> URL: https://issues.apache.org/jira/browse/SPARK-45438
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.2.1, 3.3.0, 3.2.2, 3.3.1, 3.2.3, 3.2.4, 3.3.3, 
> 3.3.2, 3.4.0, 3.4.1, 3.5.0
>Reporter: Navin Kumar
>Priority: Major
>  Labels: scala
>
> When submitting an application to Spark built with Scala 2.13, there are 
> issues with Decimal overflow that show up when using unary minus (and also 
> {{abs()}} which uses unary minus under the hood.
> Here is an example PySpark reproduce use case:
> {code}
> from decimal import Decimal
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType,StructField, DecimalType
> spark = SparkSession.builder \
>   .master("local[*]") \
>   .appName("decimal_precision") \
>   .config("spark.rapids.sql.explain", "ALL") \
>   .config("spark.sql.ansi.enabled", "true") \
>   .config("spark.sql.legacy.allowNegativeScaleOfDecimal", 'true') \
>   .getOrCreate()  
> precision = 38
> scale = 0
> DECIMAL_MIN = Decimal('-' + ('9' * precision) + 'e' + str(-scale))
> data = [[DECIMAL_MIN]]
> schema = StructType([
> StructField("a", DecimalType(precision, scale), True)])
> df = spark.createDataFrame(data=data, schema=schema)
> df.selectExpr("a", "-a").show()
> {code}
> This particular example will run successfully on Spark built with Scala 2.12, 
> but throw a java.math.ArithmeticException on Spark built with Scala 2.13. 
> If you change the value of {{DECIMAL_MIN}} in the previous code to something 
> just ahead of the original DECIMAL_MIN, you will not get an exception thrown, 
> but instead you will get an incorrect answer (possibly due to overflow):
> {code}
> ...
> DECIMAL_MIN = Decimal('-8' + ('9' * (precision-1)) + 'e' + str(-scale))
> ...
> {code} 
> Output:
> {code}
> +++
> |   a|   (- a)|
> +++
> |-8999...|9...|
> +++
> {code}
> It looks like the code in {{Decimal.scala}} uses {{scala.math.BigDecimal}}. 
> See https://github.com/scala/bug/issues/11590 with updates on how Scala 2.13 
> handles BigDecimal. It looks like there is {{java.math.MathContext}} missing 
> when performing these operations. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45438) Decimal precision exceeds max precision error when using unary minus on min Decimal values

2023-10-06 Thread Navin Kumar (Jira)
Navin Kumar created SPARK-45438:
---

 Summary: Decimal precision exceeds max precision error when using 
unary minus on min Decimal values
 Key: SPARK-45438
 URL: https://issues.apache.org/jira/browse/SPARK-45438
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.5.0, 3.4.1, 3.4.0, 3.3.2, 3.3.3, 3.2.4, 3.2.3, 3.3.1, 
3.2.2, 3.3.0, 3.2.1, 3.2.0
Reporter: Navin Kumar


When submitting an application to Spark built with Scala 2.13, there are issues 
with Decimal overflow that show up when using unary minus (and also {{abs()}} 
which uses unary minus under the hood.

Here is an example PySpark reproduce use case:

{code}
from decimal import Decimal

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, DecimalType

spark = SparkSession.builder \
  .master("local[*]") \
  .appName("decimal_precision") \
  .config("spark.rapids.sql.explain", "ALL") \
  .config("spark.sql.ansi.enabled", "true") \
  .config("spark.sql.legacy.allowNegativeScaleOfDecimal", 'true') \
  .getOrCreate()  

precision = 38
scale = 0
DECIMAL_MIN = Decimal('-' + ('9' * precision) + 'e' + str(-scale))

data = [[DECIMAL_MIN]]

schema = StructType([
StructField("a", DecimalType(precision, scale), True)])
df = spark.createDataFrame(data=data, schema=schema)

df.selectExpr("a", "-a").show()
{code}

This particular example will run successfully on Spark built with Scala 2.12, 
but throw a java.math.ArithmeticException on Spark built with Scala 2.13. 

If you change the value of {{DECIMAL_MIN}} in the previous code to something 
just ahead of the original DECIMAL_MIN, you will not get an exception thrown, 
but instead you will get an incorrect answer (possibly due to overflow):

{code}
...
DECIMAL_MIN = Decimal('-8' + ('9' * (precision-1)) + 'e' + str(-scale))
...
{code} 

Output:
{code}
+++
|   a|   (- a)|
+++
|-8999...|9...|
+++
{code}

It looks like the code in {{Decimal.scala}} uses {{scala.math.BigDecimal}}. See 
https://github.com/scala/bug/issues/11590 with updates on how Scala 2.13 
handles BigDecimal. It looks like there is {{java.math.MathContext}} missing 
when performing these operations. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45426) [CORE] Add support for a ReloadingTrustManager

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45426:
---
Labels: pull-request-available  (was: )

> [CORE] Add support for a ReloadingTrustManager
> --
>
> Key: SPARK-45426
> URL: https://issues.apache.org/jira/browse/SPARK-45426
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Hasnain Lakhani
>Priority: Major
>  Labels: pull-request-available
>
> For the RPC SSL feature, this allows us to properly reload the trust store if 
> needed at runtime.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44639) Add option to use Java tmp dir for RocksDB state store

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-44639:
---
Labels: pull-request-available  (was: )

> Add option to use Java tmp dir for RocksDB state store
> --
>
> Key: SPARK-44639
> URL: https://issues.apache.org/jira/browse/SPARK-44639
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Structured Streaming
>Affects Versions: 3.4.1
>Reporter: Adam Binford
>Priority: Major
>  Labels: pull-request-available
>
> Currently local RocksDB state is stored in a local directory given by 
> Utils.getLocalDir. On yarn this is a directory created inside the root 
> application folder such as
> {{/tmp/nm-local-dir/usercache//appcache//}}
> The problem with this is that if an executor crashes for some reason (like 
> OOM) and the shutdown hooks don't get run, this directory will stay around 
> forever until the application finishes, which can cause jobs to slowly 
> accumulate more and more temporary space until finally the node manager goes 
> unhealthy.
> Because this data will only ever be accessed by the executor that created 
> this directory, it would make sense to store the data inside the container 
> folder, which will always get cleaned up by the node manager when that yarn 
> container gets cleaned up. Yarn sets the `java.io.tmpdir` to a path inside 
> this directory, such as
> {{/tmp/nm-local-dir/usercache//appcache///tmp/}}
> I'm not sure the behavior for other resource managers, so this could be an 
> opt-in config that can be specified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45437) Upgrade SNAPPY to 1.1.10.5 to pick up fix re Linux PLE64

2023-10-06 Thread N Campbell (Jira)
N Campbell created SPARK-45437:
--

 Summary: Upgrade SNAPPY to 1.1.10.5 to pick up fix re Linux PLE64
 Key: SPARK-45437
 URL: https://issues.apache.org/jira/browse/SPARK-45437
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.5.0
Reporter: N Campbell


SPARK-45323 move to Snappy 1.1.10.4 and is proposing to add to SPARK 3.5.1

Snappy prior to 1.1.10.5 will not work on Linux PLE 64.

Moving to Snappy 1.1.10.5 will address that issue
https://github.com/xerial/snappy-java/pull/515



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40513) SPIP: Support Docker Official Image for Spark

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-40513:
---
Labels: SPIP pull-request-available  (was: SPIP)

> SPIP: Support Docker Official Image for Spark
> -
>
> Key: SPARK-40513
> URL: https://issues.apache.org/jira/browse/SPARK-40513
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Spark Docker
>Affects Versions: 3.5.0
>Reporter: Yikun Jiang
>Assignee: Yikun Jiang
>Priority: Major
>  Labels: SPIP, pull-request-available
> Fix For: 3.5.0
>
>
> This SPIP is proposed to add [Docker Official 
> Image(DOI)|https://github.com/docker-library/official-images] to ensure the 
> Spark Docker images meet the quality standards for Docker images, to provide 
> these Docker images for users who want to use Apache Spark via Docker image.
> There are also several [Apache projects that release the Docker Official 
> Images|https://hub.docker.com/search?q=apache_filter=official], such 
> as: [flink|https://hub.docker.com/_/flink], 
> [storm|https://hub.docker.com/_/storm], [solr|https://hub.docker.com/_/solr], 
> [zookeeper|https://hub.docker.com/_/zookeeper], 
> [httpd|https://hub.docker.com/_/httpd] (with 50M+ to 1B+ download for each). 
> From the huge download statistics, we can see the real demands of users, and 
> from the support of other apache projects, we should also be able to do it.
> After support:
>  * The Dockerfile will still be maintained by the Apache Spark community and 
> reviewed by Docker.
>  * The images will be maintained by the Docker community to ensure the 
> quality standards for Docker images of the Docker community.
> It will also reduce the extra docker images maintenance effort (such as 
> frequently rebuilding, image security update) of the Apache Spark community.
>  
> SPIP DOC: 
> [https://docs.google.com/document/d/1nN-pKuvt-amUcrkTvYAQ-bJBgtsWb9nAkNoVNRM2S2o]
> DISCUSS: [https://lists.apache.org/thread/l1793y5224n8bqkp3s6ltgkykso4htb3]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45435) Document that lazy checkpoint may not be a consistent

2023-10-06 Thread Juliusz Sompolski (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juliusz Sompolski updated SPARK-45435:
--
Summary: Document that lazy checkpoint may not be a consistent  (was: 
Document that lazy checkpoint may cause undeterministm)

> Document that lazy checkpoint may not be a consistent
> -
>
> Key: SPARK-45435
> URL: https://issues.apache.org/jira/browse/SPARK-45435
> Project: Spark
>  Issue Type: Documentation
>  Components: Spark Core, SQL
>Affects Versions: 4.0.0
>Reporter: Juliusz Sompolski
>Priority: Major
>  Labels: pull-request-available
>
> Some people may want to use checkpoint to get a consistent snapshot of the 
> Dataset / RDD. Warn that this is not the case with lazy checkpoint, because 
> checkpoint is computed only at the end of the first action, and the data used 
> during the first action may be different because of non-determinism and 
> retries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45436) DataFrame methods check same session

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45436:
---
Labels: pull-request-available  (was: )

> DataFrame methods check same session
> 
>
> Key: SPARK-45436
> URL: https://issues.apache.org/jira/browse/SPARK-45436
> Project: Spark
>  Issue Type: Improvement
>  Components: Connect, PySpark
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45435) Document that lazy checkpoint may cause undeterministm

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45435:
---
Labels: pull-request-available  (was: )

> Document that lazy checkpoint may cause undeterministm
> --
>
> Key: SPARK-45435
> URL: https://issues.apache.org/jira/browse/SPARK-45435
> Project: Spark
>  Issue Type: Documentation
>  Components: Spark Core, SQL
>Affects Versions: 4.0.0
>Reporter: Juliusz Sompolski
>Priority: Major
>  Labels: pull-request-available
>
> Some people may want to use checkpoint to get a consistent snapshot of the 
> Dataset / RDD. Warn that this is not the case with lazy checkpoint, because 
> checkpoint is computed only at the end of the first action, and the data used 
> during the first action may be different because of non-determinism and 
> retries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45436) DataFrame methods check same session

2023-10-06 Thread Ruifeng Zheng (Jira)
Ruifeng Zheng created SPARK-45436:
-

 Summary: DataFrame methods check same session
 Key: SPARK-45436
 URL: https://issues.apache.org/jira/browse/SPARK-45436
 Project: Spark
  Issue Type: Improvement
  Components: Connect, PySpark
Affects Versions: 4.0.0
Reporter: Ruifeng Zheng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45435) Document that lazy checkpoint may cause undeterministm

2023-10-06 Thread Juliusz Sompolski (Jira)
Juliusz Sompolski created SPARK-45435:
-

 Summary: Document that lazy checkpoint may cause undeterministm
 Key: SPARK-45435
 URL: https://issues.apache.org/jira/browse/SPARK-45435
 Project: Spark
  Issue Type: Documentation
  Components: Spark Core, SQL
Affects Versions: 4.0.0
Reporter: Juliusz Sompolski


Some people may want to use checkpoint to get a consistent snapshot of the 
Dataset / RDD. Warn that this is not the case with lazy checkpoint, because 
checkpoint is computed only at the end of the first action, and the data used 
during the first action may be different because of non-determinism and retries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-45434) LogisticRegression checks the training labels

2023-10-06 Thread Ruifeng Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruifeng Zheng resolved SPARK-45434.
---
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 43246
[https://github.com/apache/spark/pull/43246]

> LogisticRegression checks the training labels
> -
>
> Key: SPARK-45434
> URL: https://issues.apache.org/jira/browse/SPARK-45434
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45434) LogisticRegression checks the training labels

2023-10-06 Thread Ruifeng Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruifeng Zheng reassigned SPARK-45434:
-

Assignee: Ruifeng Zheng

> LogisticRegression checks the training labels
> -
>
> Key: SPARK-45434
> URL: https://issues.apache.org/jira/browse/SPARK-45434
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45433) CSV/JSON schema inference when timestamps do not match specified timestampFormat with only one row on each partition report error

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-45433:
--

Assignee: Apache Spark

> CSV/JSON schema inference when timestamps do not match specified 
> timestampFormat with only one row on each partition report error
> -
>
> Key: SPARK-45433
> URL: https://issues.apache.org/jira/browse/SPARK-45433
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.4.0, 3.5.0
>Reporter: Jia Fan
>Assignee: Apache Spark
>Priority: Major
>  Labels: pull-request-available
>
> CSV/JSON schema inference when timestamps do not match specified 
> timestampFormat with `only one row on each partition` report error.
> {code:java}
> //eg
> val csv = spark.read.option("timestampFormat", "-MM-dd'T'HH:mm:ss")
>   .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS())
> csv.show() {code}
> {code:java}
> //error
> Caused by: java.time.format.DateTimeParseException: Text 
> '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index 
> 19 {code}
> This bug affect 3.3/3.4/3.5. Unlike 
> https://issues.apache.org/jira/browse/SPARK-45424 , this is a different bug 
> but has the same error message



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45433) CSV/JSON schema inference when timestamps do not match specified timestampFormat with only one row on each partition report error

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-45433:
--

Assignee: (was: Apache Spark)

> CSV/JSON schema inference when timestamps do not match specified 
> timestampFormat with only one row on each partition report error
> -
>
> Key: SPARK-45433
> URL: https://issues.apache.org/jira/browse/SPARK-45433
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.4.0, 3.5.0
>Reporter: Jia Fan
>Priority: Major
>  Labels: pull-request-available
>
> CSV/JSON schema inference when timestamps do not match specified 
> timestampFormat with `only one row on each partition` report error.
> {code:java}
> //eg
> val csv = spark.read.option("timestampFormat", "-MM-dd'T'HH:mm:ss")
>   .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS())
> csv.show() {code}
> {code:java}
> //error
> Caused by: java.time.format.DateTimeParseException: Text 
> '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index 
> 19 {code}
> This bug affect 3.3/3.4/3.5. Unlike 
> https://issues.apache.org/jira/browse/SPARK-45424 , this is a different bug 
> but has the same error message



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45433) CSV/JSON schema inference when timestamps do not match specified timestampFormat with only one row on each partition report error

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-45433:
--

Assignee: (was: Apache Spark)

> CSV/JSON schema inference when timestamps do not match specified 
> timestampFormat with only one row on each partition report error
> -
>
> Key: SPARK-45433
> URL: https://issues.apache.org/jira/browse/SPARK-45433
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.4.0, 3.5.0
>Reporter: Jia Fan
>Priority: Major
>  Labels: pull-request-available
>
> CSV/JSON schema inference when timestamps do not match specified 
> timestampFormat with `only one row on each partition` report error.
> {code:java}
> //eg
> val csv = spark.read.option("timestampFormat", "-MM-dd'T'HH:mm:ss")
>   .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS())
> csv.show() {code}
> {code:java}
> //error
> Caused by: java.time.format.DateTimeParseException: Text 
> '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index 
> 19 {code}
> This bug affect 3.3/3.4/3.5. Unlike 
> https://issues.apache.org/jira/browse/SPARK-45424 , this is a different bug 
> but has the same error message



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45433) CSV/JSON schema inference when timestamps do not match specified timestampFormat with only one row on each partition report error

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot reassigned SPARK-45433:
--

Assignee: Apache Spark

> CSV/JSON schema inference when timestamps do not match specified 
> timestampFormat with only one row on each partition report error
> -
>
> Key: SPARK-45433
> URL: https://issues.apache.org/jira/browse/SPARK-45433
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.4.0, 3.5.0
>Reporter: Jia Fan
>Assignee: Apache Spark
>Priority: Major
>  Labels: pull-request-available
>
> CSV/JSON schema inference when timestamps do not match specified 
> timestampFormat with `only one row on each partition` report error.
> {code:java}
> //eg
> val csv = spark.read.option("timestampFormat", "-MM-dd'T'HH:mm:ss")
>   .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS())
> csv.show() {code}
> {code:java}
> //error
> Caused by: java.time.format.DateTimeParseException: Text 
> '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index 
> 19 {code}
> This bug affect 3.3/3.4/3.5. Unlike 
> https://issues.apache.org/jira/browse/SPARK-45424 , this is a different bug 
> but has the same error message



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45434) LogisticRegression checks the training labels

2023-10-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45434:
---
Labels: pull-request-available  (was: )

> LogisticRegression checks the training labels
> -
>
> Key: SPARK-45434
> URL: https://issues.apache.org/jira/browse/SPARK-45434
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45434) Validate the training labels

2023-10-06 Thread Ruifeng Zheng (Jira)
Ruifeng Zheng created SPARK-45434:
-

 Summary: Validate the training labels
 Key: SPARK-45434
 URL: https://issues.apache.org/jira/browse/SPARK-45434
 Project: Spark
  Issue Type: Improvement
  Components: ML
Affects Versions: 4.0.0
Reporter: Ruifeng Zheng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45434) LogisticRegression checks the training labels

2023-10-06 Thread Ruifeng Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruifeng Zheng updated SPARK-45434:
--
Summary: LogisticRegression checks the training labels  (was: 
LogisticRegression check the training labels)

> LogisticRegression checks the training labels
> -
>
> Key: SPARK-45434
> URL: https://issues.apache.org/jira/browse/SPARK-45434
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45434) LogisticRegression check the training labels

2023-10-06 Thread Ruifeng Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruifeng Zheng updated SPARK-45434:
--
Summary: LogisticRegression check the training labels  (was: Validate the 
training labels)

> LogisticRegression check the training labels
> 
>
> Key: SPARK-45434
> URL: https://issues.apache.org/jira/browse/SPARK-45434
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45432) Remove deprecated Hadoop-2 `LocatedFileStatus` constructor

2023-10-06 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-45432:
-

Assignee: Dongjoon Hyun

> Remove deprecated Hadoop-2 `LocatedFileStatus` constructor
> --
>
> Key: SPARK-45432
> URL: https://issues.apache.org/jira/browse/SPARK-45432
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-45432) Remove deprecated Hadoop-2 `LocatedFileStatus` constructor

2023-10-06 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-45432.
---
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 43239
[https://github.com/apache/spark/pull/43239]

> Remove deprecated Hadoop-2 `LocatedFileStatus` constructor
> --
>
> Key: SPARK-45432
> URL: https://issues.apache.org/jira/browse/SPARK-45432
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-45357) Maven test `SparkConnectProtoSuite` failed

2023-10-06 Thread Ruifeng Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruifeng Zheng resolved SPARK-45357.
---
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 43155
[https://github.com/apache/spark/pull/43155]

> Maven test `SparkConnectProtoSuite` failed
> --
>
> Key: SPARK-45357
> URL: https://issues.apache.org/jira/browse/SPARK-45357
> Project: Spark
>  Issue Type: Bug
>  Components: Connect
>Affects Versions: 4.0.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
>  
> build/mvn clean install -pl connector/connect/server -am -DskipTests
> mvn test -pl connector/connect/server 
>  
> {code:java}
> - Test observe *** FAILED ***
>   == FAIL: Plans do not match ===
>   !CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, 
> sum(id#0) AS sum(id)#0L], 0   CollectMetrics my_metric, [min(id#0) AS 
> min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 53
>    +- LocalRelation , [id#0, name#0]                                   
>                               +- LocalRelation , [id#0, name#0] 
> (PlanTest.scala:179) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-45357) Maven test `SparkConnectProtoSuite` failed

2023-10-06 Thread Ruifeng Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruifeng Zheng reassigned SPARK-45357:
-

Assignee: Yang Jie

> Maven test `SparkConnectProtoSuite` failed
> --
>
> Key: SPARK-45357
> URL: https://issues.apache.org/jira/browse/SPARK-45357
> Project: Spark
>  Issue Type: Bug
>  Components: Connect
>Affects Versions: 4.0.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Major
>  Labels: pull-request-available
>
>  
> build/mvn clean install -pl connector/connect/server -am -DskipTests
> mvn test -pl connector/connect/server 
>  
> {code:java}
> - Test observe *** FAILED ***
>   == FAIL: Plans do not match ===
>   !CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, 
> sum(id#0) AS sum(id)#0L], 0   CollectMetrics my_metric, [min(id#0) AS 
> min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 53
>    +- LocalRelation , [id#0, name#0]                                   
>                               +- LocalRelation , [id#0, name#0] 
> (PlanTest.scala:179) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org