[jira] [Commented] (SPARK-24193) Sort by disk when number of limit is big in TakeOrderedAndProjectExec

2020-05-08 Thread Xianjin YE (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102458#comment-17102458
 ] 

Xianjin YE commented on SPARK-24193:


I used `df.rdd.collect` intentionally to trigger the problem as `df.collect` is 
converted to `SparkPlan.executeTake` which is getting data correctly.

 

The problem can also be triggered with a slightly different version:
{code:java}
val spark = SparkSession
  .builder
  .appName("Spark TopK test")
  .master("local-cluster[8, 1, 1024]")
  .getOrCreate()
val temp1 = Utils.createTempDir()
val data = spark.range(10, 0, -1, 10).toDF("id").selectExpr("id + 1 as 
id")
spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, 100)

data.orderBy("id").limit(200).write.mode("overwrite").parquet(temp1.toString)
val topKInSort = spark.read.parquet(temp1.toString).collect()
spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, Int.MaxValue)

data.orderBy("id").limit(200).write.mode("overwrite").parquet(temp1.toString)
val topKInMemory = spark.read.parquet(temp1.toString).collect()
println(topKInMemory.map(_.getLong(0)).mkString("[", ",", "]"))
println(topKInSort.map(_.getLong(0)).mkString("[", ",", "]"))
assert(topKInMemory sameElements topKInSort)

{code}
The real problem is that if I am going to accessing the ordered and limited 
data such as joining or writing to external table, the data is incorrect when 
falling back into CollectLimitExec.

> Sort by disk when number of limit is big in TakeOrderedAndProjectExec
> -
>
> Key: SPARK-24193
> URL: https://issues.apache.org/jira/browse/SPARK-24193
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
> Fix For: 2.4.0
>
>
> Physical plan of  "_select colA from t order by colB limit M_" is 
> _TakeOrderedAndProject_;
> Currently _TakeOrderedAndProject_ sorts data in memory, see 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L158
>  
> Shall we add a config -- if the number of limit (M) is too big, we can sort 
> by disk ? Thus memory issue can be resolved.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24193) Sort by disk when number of limit is big in TakeOrderedAndProjectExec

2020-05-08 Thread Wenchen Fan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102439#comment-17102439
 ] 

Wenchen Fan commented on SPARK-24193:
-

I think it's not a problem if you do `df.collect` instead of `df.rdd.collect`.

LIMIT only preserves the data order if it's the last operation. When you do 
`df.rdd`, it means you are going to add more operations.

> Sort by disk when number of limit is big in TakeOrderedAndProjectExec
> -
>
> Key: SPARK-24193
> URL: https://issues.apache.org/jira/browse/SPARK-24193
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
> Fix For: 2.4.0
>
>
> Physical plan of  "_select colA from t order by colB limit M_" is 
> _TakeOrderedAndProject_;
> Currently _TakeOrderedAndProject_ sorts data in memory, see 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L158
>  
> Shall we add a config -- if the number of limit (M) is too big, we can sort 
> by disk ? Thus memory issue can be resolved.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24193) Sort by disk when number of limit is big in TakeOrderedAndProjectExec

2020-05-07 Thread Xianjin YE (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102285#comment-17102285
 ] 

Xianjin YE commented on SPARK-24193:


Hi, [~jinxing6...@126.com] [~cloud_fan] the fallback config has a correctness 
issue, we may need to revert this change in Spark 2.4 and Spark 3.0

 

Way to reproduce:
{code:java}

val spark = SparkSession
  .builder
  .appName("Spark TopK test")
  .master("local-cluster[8, 1, 1024]")
  .getOrCreate()val data = spark.range(10, 0, -1, 
10).toDF("id").selectExpr("id + 1 as id")
spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, 100)
val topKInSort = data.orderBy("id").limit(200).rdd.collect()
spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, Int.MaxValue)
val topKInMemory = data.orderBy("id").limit(200).rdd.collect()
println(topKInMemory.map(_.getLong(0)).mkString("[", ",", "]"))
println(topKInSort.map(_.getLong(0)).mkString("[", ",", "]"))
assert(topKInMemory sameElements topKInSort)

{code}
 

The issue:

`CollectLimitExec`'s core idea is 
`sortedRDD.mapPartitionsInternal(_.take(limit)).repartition(1).mapPartitionsInternal(_.take(limit))`
 which doesn't guarantee the ordering semantics, so we cannot simply fallback 
to CollectLimitExec.

 

Proposal to fix:
 # revert the fallback logic
 # implements CollectLimitExec similar with CollectTailExec, however it may not 
suitable if the k in TopK is large enough. 
 # Another one is to do a similar calculation of CollectTailExec, however we 
only collect record number of each partition and we can take the exact number 
of records in each partition by leverage that information.

> Sort by disk when number of limit is big in TakeOrderedAndProjectExec
> -
>
> Key: SPARK-24193
> URL: https://issues.apache.org/jira/browse/SPARK-24193
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jin Xing
>Assignee: Jin Xing
>Priority: Major
> Fix For: 2.4.0
>
>
> Physical plan of  "_select colA from t order by colB limit M_" is 
> _TakeOrderedAndProject_;
> Currently _TakeOrderedAndProject_ sorts data in memory, see 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L158
>  
> Shall we add a config -- if the number of limit (M) is too big, we can sort 
> by disk ? Thus memory issue can be resolved.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24193) Sort by disk when number of limit is big in TakeOrderedAndProjectExec

2018-05-06 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465192#comment-16465192
 ] 

Apache Spark commented on SPARK-24193:
--

User 'jinxing64' has created a pull request for this issue:
https://github.com/apache/spark/pull/21252

> Sort by disk when number of limit is big in TakeOrderedAndProjectExec
> -
>
> Key: SPARK-24193
> URL: https://issues.apache.org/jira/browse/SPARK-24193
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: jin xing
>Priority: Major
>
> Physical plan of  "_select colA from t order by colB limit M_" is 
> _TakeOrderedAndProject_;
> Currently _TakeOrderedAndProject_ sorts data in memory, see 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L158
>  
> Shall we add a config -- if the number of limit (M) is too big, we can sort 
> by disk ? Thus memory issue can be resolved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org