[ https://issues.apache.org/jira/browse/SPARK-29266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen updated SPARK-29266: ------------------------------- Description: SPARK-23627 added a {{Dataset.isEmpty}} method. This is currently implemented as {code:java} def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan => plan.executeCollect().head.getLong(0) == 0 } {code} which has a global limit of 1 embedded in the middle of the query plan. As a result, this will end up computing *all* partitions of the Dataset but each task can stop early once it's computed a single record. We could instead implement this as {{ds.limit(1).collect().isEmpty}} but that will go through the "CollectLimit" execution strategy which first computes 1 partition, then 2, then 4, and so on. That will be faster in some cases but slower in others: if the dataset is indeed empty then that method will be slower than one which checks all partitions in parallel, but if it's non-empty (and most tasks' output is non-empty) then it can be much faster. There's not an obviously-best implementation here. However, I think there's high value (and low downside) to optimizing for the special case where the Dataset is an unfiltered, untransformed input dataset (e.g. the output of {{spark.read.parquet}}): I found a production job which calls {{isEmpty}} on the output of {{spark.read.parquet()}} and the {{isEmpty}} call took several minutes to complete because it needed to launch hundreds of thousands of tasks to compute a single record of each partition (this is an enormous dataset). I could instruct the job author use a different, more efficient method of checking for non-emptiness, but this feels like the type of optimization that Spark should handle itself. Maybe we can special-case {{IsEmpty}} for the case where plan consists of only a file source scan (or a file source scan followed by a projection, but without any filters, etc.). In those cases, we can use either the {{.limit(1).take()}} implementation (under assumption that we don't have a ton of empty input files) or something fancier (metadata-only query, looking at Parquet footers, delegating to some datasource API, etc). was: SPARK-23627 added a {{Dataset.isEmpty}} method. This is currently implemented as {code:java} def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan => plan.executeCollect().head.getLong(0) == 0 } {code} which has a global limit of 1 embedded in the middle of the query plan. As a result, this will end up computing *all* partitions of the Dataset but each task can stop early once it's computed a single record. We could instead implement this as {{ds.limit(1).collect().isEmpty}} but that will go through the "CollectLimit" execution strategy which first computes 1 partition, then 2, then 4, and so on. That will be faster in some cases but slower in others: if the dataset is indeed empty then that method will be slower than one which checks all partitions in parallel, but if it's non-empty (and most tasks' output is non-empty) then it can be much faster. There's not an obviously-best implementation here. However, I think there's high value (and low downside) to optimizing for the special case where the Dataset is an unfiltered, untransformed input dataset (e.g. the output of {{spark.read.parquet}}): I found a production job which calls {{isEmpty}} on the output of {{spark.read.parquet()}} and the {{isEmpty}} call took several minutes to complete because it needed to launch hundreds of thousands of tasks to compute a single record of each partition. I could instruct the job author use a different, more efficient method of checking for non-emptiness, but this feels like the type of optimization that Spark should handle itself. Maybe we can special-case {{IsEmpty}} for the case where plan consists of only a file source scan (or a file source scan followed by a projection, but without any filters, etc.). In those cases, we can use either the {{.limit(1).take()}} implementation (under assumption that we don't have a ton of empty input files) or something fancier (metadata-only query, looking at Parquet footers, delegating to some datasource API, etc). > Optimize Dataset.isEmpty for base relations / unfiltered datasets > ----------------------------------------------------------------- > > Key: SPARK-29266 > URL: https://issues.apache.org/jira/browse/SPARK-29266 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.0.0 > Reporter: Josh Rosen > Priority: Minor > > SPARK-23627 added a {{Dataset.isEmpty}} method. This is currently implemented > as > {code:java} > def isEmpty: Boolean = withAction("isEmpty", > limit(1).groupBy().count().queryExecution) { plan => > plan.executeCollect().head.getLong(0) == 0 > } > {code} > which has a global limit of 1 embedded in the middle of the query plan. > As a result, this will end up computing *all* partitions of the Dataset but > each task can stop early once it's computed a single record. > We could instead implement this as {{ds.limit(1).collect().isEmpty}} but that > will go through the "CollectLimit" execution strategy which first computes 1 > partition, then 2, then 4, and so on. That will be faster in some cases but > slower in others: if the dataset is indeed empty then that method will be > slower than one which checks all partitions in parallel, but if it's > non-empty (and most tasks' output is non-empty) then it can be much faster. > There's not an obviously-best implementation here. However, I think there's > high value (and low downside) to optimizing for the special case where the > Dataset is an unfiltered, untransformed input dataset (e.g. the output of > {{spark.read.parquet}}): > I found a production job which calls {{isEmpty}} on the output of > {{spark.read.parquet()}} and the {{isEmpty}} call took several minutes to > complete because it needed to launch hundreds of thousands of tasks to > compute a single record of each partition (this is an enormous dataset). > I could instruct the job author use a different, more efficient method of > checking for non-emptiness, but this feels like the type of optimization that > Spark should handle itself. > Maybe we can special-case {{IsEmpty}} for the case where plan consists of > only a file source scan (or a file source scan followed by a projection, but > without any filters, etc.). In those cases, we can use either the > {{.limit(1).take()}} implementation (under assumption that we don't have a > ton of empty input files) or something fancier (metadata-only query, looking > at Parquet footers, delegating to some datasource API, etc). > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org