[jira] [Commented] (SPARK-37185) DataFrame.take() only uses one worker

2021-11-02 Thread mathieu longtin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17437585#comment-17437585
 ] 

mathieu longtin commented on SPARK-37185:
-

It seems to try to optimize for a simple query, but not more complex queries. 
It kind of make sense for "select * from t", but any where clause can make it 
quite restrictive.

It looks like it scans the first part, doesn't find enough data, then scans 
four parts, then decides to scan everything. This is nice, but meanwhile, I 
have 20 workers already reserved, it wouldn't cost anything more to just go 
ahead right away.

Timing, table is not cached, contains 69 csv.gz files with anywhere from 1MB to 
2.2GB of data:
{code:java}
In [1]: %time spark.sql("select * from t where x = 99").take(10)
CPU times: user 83.9 ms, sys: 112 ms, total: 196 ms
Wall time: 6min 44s
...
In [2]: %time spark.sql("select * from t where x = 99").limit(10).rdd.collect()
CPU times: user 45.7 ms, sys: 73.9 ms, total: 120 ms
Wall time: 3min 59s
...


{code}
I ran the two tests a few times to make sure there was no OS level caching 
effect, the timing didn't change much.

If I cache the table first, then "take(10)" is faster than 
"limit(10).rdd.collect()".

> DataFrame.take() only uses one worker
> -
>
> Key: SPARK-37185
> URL: https://issues.apache.org/jira/browse/SPARK-37185
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1, 3.2.0
> Environment: CentOS 7
>Reporter: mathieu longtin
>Priority: Major
>
> Say you have query:
> {code:java}
> >>> df = spark.sql("select * from mytable where x = 99"){code}
> Now, out of billions of row, there's only ten rows where x is 99.
> If I do:
> {code:java}
> >>> df.limit(10).collect()
> [Stage 1:>  (0 + 1) / 1]{code}
> It only uses one worker. This takes a really long time since one CPU is 
> reading the billions of row.
> However, if I do this:
> {code:java}
> >>> df.limit(10).rdd.collect()
> [Stage 1:>  (0 + 10) / 22]{code}
> All the workers are running.
> I think there's some optimization issue DataFrame.take(...).
> This did not use to be an issue, but I'm not sure if it was working with 3.0 
> or 2.4.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37185) DataFrame.take() only uses one worker

2021-11-02 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17437182#comment-17437182
 ] 

Hyukjin Kwon commented on SPARK-37185:
--

can you show the perf diff between both codes?

> DataFrame.take() only uses one worker
> -
>
> Key: SPARK-37185
> URL: https://issues.apache.org/jira/browse/SPARK-37185
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1, 3.2.0
> Environment: CentOS 7
>Reporter: mathieu longtin
>Priority: Major
>
> Say you have query:
> {code:java}
> >>> df = spark.sql("select * from mytable where x = 99"){code}
> Now, out of billions of row, there's only ten rows where x is 99.
> If I do:
> {code:java}
> >>> df.limit(10).collect()
> [Stage 1:>  (0 + 1) / 1]{code}
> It only uses one worker. This takes a really long time since one CPU is 
> reading the billions of row.
> However, if I do this:
> {code:java}
> >>> df.limit(10).rdd.collect()
> [Stage 1:>  (0 + 10) / 22]{code}
> All the workers are running.
> I think there's some optimization issue DataFrame.take(...).
> This did not use to be an issue, but I'm not sure if it was working with 3.0 
> or 2.4.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37185) DataFrame.take() only uses one worker

2021-11-02 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17437180#comment-17437180
 ] 

Hyukjin Kwon commented on SPARK-37185:
--

isn't it more optimized to use only one partition on one worker if less data is 
required? 

> DataFrame.take() only uses one worker
> -
>
> Key: SPARK-37185
> URL: https://issues.apache.org/jira/browse/SPARK-37185
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1, 3.2.0
> Environment: CentOS 7
>Reporter: mathieu longtin
>Priority: Major
>
> Say you have query:
> {code:java}
> >>> df = spark.sql("select * from mytable where x = 99"){code}
> Now, out of billions of row, there's only ten rows where x is 99.
> If I do:
> {code:java}
> >>> df.limit(10).collect()
> [Stage 1:>  (0 + 1) / 1]{code}
> It only uses one worker. This takes a really long time since one CPU is 
> reading the billions of row.
> However, if I do this:
> {code:java}
> >>> df.limit(10).rdd.collect()
> [Stage 1:>  (0 + 10) / 22]{code}
> All the workers are running.
> I think there's some optimization issue DataFrame.take(...).
> This did not use to be an issue, but I'm not sure if it was working with 3.0 
> or 2.4.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37185) DataFrame.take() only uses one worker

2021-11-01 Thread mathieu longtin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17437009#comment-17437009
 ] 

mathieu longtin commented on SPARK-37185:
-

Additional note: if there's a "group by" in the query, this is not an issue.

> DataFrame.take() only uses one worker
> -
>
> Key: SPARK-37185
> URL: https://issues.apache.org/jira/browse/SPARK-37185
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1, 3.2.0
> Environment: CentOS 7
>Reporter: mathieu longtin
>Priority: Major
>
> Say you have query:
> {code:java}
> >>> df = spark.sql("select * from mytable where x = 99"){code}
> Now, out of billions of row, there's only ten rows where x is 99.
> If I do:
> {code:java}
> >>> df.limit(10).collect()
> [Stage 1:>  (0 + 1) / 1]{code}
> It only uses one worker. This takes a really long time since one CPU is 
> reading the billions of row.
> However, if I do this:
> {code:java}
> >>> df.limit(10).rdd.collect()
> [Stage 1:>  (0 + 10) / 22]{code}
> All the workers are running.
> I think there's some optimization issue DataFrame.take(...).
> This did not use to be an issue, but I'm not sure if it was working with 3.0 
> or 2.4.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org