[ 
https://issues.apache.org/jira/browse/SPARK-29266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-29266:
-------------------------------
    Description: 
SPARK-23627 added a {{Dataset.isEmpty}} method. This is currently implemented as
{code:java}
def isEmpty: Boolean = withAction("isEmpty", 
limit(1).groupBy().count().queryExecution) { plan =>
    plan.executeCollect().head.getLong(0) == 0
  }
{code}
which has a global limit of 1 embedded in the middle of the query plan.

As a result, this will end up computing *all* partitions of the Dataset but 
each task can stop early once it's computed a single record.

We could instead implement this as {{ds.limit(1).collect().isEmpty}} but that 
will go through the "CollectLimit" execution strategy which first computes 1 
partition, then 2, then 4, and so on. That will be faster in some cases but 
slower in others: if the dataset is indeed empty then that method will be 
slower than one which checks all partitions in parallel, but if it's non-empty 
(and most tasks' output is non-empty) then it can be much faster.

There's not an obviously-best implementation here. However, I think there's 
high value (and low downside) to optimizing for the special case where the 
Dataset is an unfiltered, untransformed input dataset (e.g. the output of 
{{spark.read.parquet}}):

I found a production job which calls {{isEmpty}} on the output of 
{{spark.read.parquet()}} and the {{isEmpty}} call took several minutes to 
complete because it needed to launch hundreds of thousands of tasks to compute 
a single record of each partition (this is an enormous dataset).

I could instruct the job author use a different, more efficient method of 
checking for non-emptiness, but this feels like the type of optimization that 
Spark should handle itself.

Maybe we can special-case {{IsEmpty}} for the case where plan consists of only 
a file source scan (or a file source scan followed by a projection, but without 
any filters, etc.). In those cases, we can use either the {{.limit(1).take()}} 
implementation (under assumption that we don't have a ton of empty input files) 
or something fancier (metadata-only query, looking at Parquet footers, 
delegating to some datasource API, etc).

 

  was:
SPARK-23627 added a {{Dataset.isEmpty}} method. This is currently implemented as
{code:java}
def isEmpty: Boolean = withAction("isEmpty", 
limit(1).groupBy().count().queryExecution) { plan =>
    plan.executeCollect().head.getLong(0) == 0
  }
{code}
which has a global limit of 1 embedded in the middle of the query plan.

As a result, this will end up computing *all* partitions of the Dataset but 
each task can stop early once it's computed a single record.

We could instead implement this as {{ds.limit(1).collect().isEmpty}} but that 
will go through the "CollectLimit" execution strategy which first computes 1 
partition, then 2, then 4, and so on. That will be faster in some cases but 
slower in others: if the dataset is indeed empty then that method will be 
slower than one which checks all partitions in parallel, but if it's non-empty 
(and most tasks' output is non-empty) then it can be much faster.

There's not an obviously-best implementation here. However, I think there's 
high value (and low downside) to optimizing for the special case where the 
Dataset is an unfiltered, untransformed input dataset (e.g. the output of 
{{spark.read.parquet}}):

I found a production job which calls {{isEmpty}} on the output of 
{{spark.read.parquet()}} and the {{isEmpty}} call took several minutes to 
complete because it needed to launch hundreds of thousands of tasks to compute 
a single record of each partition.

I could instruct the job author use a different, more efficient method of 
checking for non-emptiness, but this feels like the type of optimization that 
Spark should handle itself.

Maybe we can special-case {{IsEmpty}} for the case where plan consists of only 
a file source scan (or a file source scan followed by a projection, but without 
any filters, etc.). In those cases, we can use either the {{.limit(1).take()}} 
implementation (under assumption that we don't have a ton of empty input files) 
or something fancier (metadata-only query, looking at Parquet footers, 
delegating to some datasource API, etc).

 


> Optimize Dataset.isEmpty for base relations / unfiltered datasets
> -----------------------------------------------------------------
>
>                 Key: SPARK-29266
>                 URL: https://issues.apache.org/jira/browse/SPARK-29266
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Josh Rosen
>            Priority: Minor
>
> SPARK-23627 added a {{Dataset.isEmpty}} method. This is currently implemented 
> as
> {code:java}
> def isEmpty: Boolean = withAction("isEmpty", 
> limit(1).groupBy().count().queryExecution) { plan =>
>     plan.executeCollect().head.getLong(0) == 0
>   }
> {code}
> which has a global limit of 1 embedded in the middle of the query plan.
> As a result, this will end up computing *all* partitions of the Dataset but 
> each task can stop early once it's computed a single record.
> We could instead implement this as {{ds.limit(1).collect().isEmpty}} but that 
> will go through the "CollectLimit" execution strategy which first computes 1 
> partition, then 2, then 4, and so on. That will be faster in some cases but 
> slower in others: if the dataset is indeed empty then that method will be 
> slower than one which checks all partitions in parallel, but if it's 
> non-empty (and most tasks' output is non-empty) then it can be much faster.
> There's not an obviously-best implementation here. However, I think there's 
> high value (and low downside) to optimizing for the special case where the 
> Dataset is an unfiltered, untransformed input dataset (e.g. the output of 
> {{spark.read.parquet}}):
> I found a production job which calls {{isEmpty}} on the output of 
> {{spark.read.parquet()}} and the {{isEmpty}} call took several minutes to 
> complete because it needed to launch hundreds of thousands of tasks to 
> compute a single record of each partition (this is an enormous dataset).
> I could instruct the job author use a different, more efficient method of 
> checking for non-emptiness, but this feels like the type of optimization that 
> Spark should handle itself.
> Maybe we can special-case {{IsEmpty}} for the case where plan consists of 
> only a file source scan (or a file source scan followed by a projection, but 
> without any filters, etc.). In those cases, we can use either the 
> {{.limit(1).take()}} implementation (under assumption that we don't have a 
> ton of empty input files) or something fancier (metadata-only query, looking 
> at Parquet footers, delegating to some datasource API, etc).
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to