EnricoMi opened a new pull request, #39754:
URL: https://github.com/apache/spark/pull/39754

   ### What changes were proposed in this pull request?
   Introduces `ScopedExpression`, which allows to resolve an expression against 
a set of attributes. This is used by `Dataset.groupByKey.agg`, 
`Dataset.groupByKey.flatMapSortedGroups`, and 
`Dataset.groupByKey.cogroupSorted` to fix surprising error messages and to make 
`Dataset.groupByKey.mapValues.agg` work at all.
   
   ### Why are the changes needed?
   Calling `ds.groupByKey(func: V => K)` creates columns to store the key 
value. These columns may conflict with columns that already exist in `ds`. 
Function `Dataset.groupByKey.agg` accounts for this with a very specific rule, 
which has some surprising weaknesses:
   
   ```Scala
   spark.range(1)
     // groupByKey adds column 'value'
     .groupByKey(id => id)
     // which cannot be referenced, though it is suggested
     .agg(count("value"))
   ```
   ```
   org.apache.spark.sql.AnalysisException: Column 'value' does not exist.
   Did you mean one of the following? [value, id];
   ```
   
   An existing 'value' column can be referenced:
   
   ```Scala
   // dataset with column 'value'
   spark.range(1).select($"id".as("value")).as[Long]
     // groupByKey adds another column 'value'
     .groupByKey(id => id)
     // agg accounts for the extra column and excludes it when resolving 'value'
     .agg(count("value"))
     .show()
   ```
   ```
   +---+------------+
   |key|count(value)|
   +---+------------+
   |  0|           1|
   +---+------------+
   ```
   
   While column suggestion shows both 'value' columns:
   
   ```Scala
   spark.range(1).select($"id".as("value")).as[Long]
     .groupByKey(id => id)
     .agg(count("unknown"))
   ```
   ```
   org.apache.spark.sql.AnalysisException: Column 'unknown' does not exist.
   Did you mean one of the following? [value, value]
   ```
   
   However, `mapValues` introduces another 'value' column, which should be 
referencable, but it breaks the exclusion introduced by `agg`:
   
   ```Scala
   spark.range(1)
     // groupByKey adds column 'value'
     .groupByKey(id => id)
     // adds another 'value' column
     .mapValues(value => value)
     // which cannot be referenced in agg
     .agg(count("value"))
   ```
   ```
   org.apache.spark.sql.AnalysisException: Reference 'value' is ambiguous, 
could be: value, value.
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   Fixes suggestions in above error messages from
   ```
   ... Did you mean one of the following? [value, value]
   ... Did you mean one of the following? [value, id];
   ```
   to
   ```
   ... Did you mean one of the following? [value]
   ... Did you mean one of the following? [id];
   ```
   
   Allows for `Dataset.groupByKey.mapValues.agg`.
   
   ### How was this patch tested?
   Tests in `DatasetSuite`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to