[ 
https://issues.apache.org/jira/browse/SPARK-27466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16834258#comment-16834258
 ] 

Bruce Robbins commented on SPARK-27466:
---------------------------------------

This _seems_ to be intentional, according to SPARK-8641 ("Native Spark Window 
Functions"), which states:
{quote}LEAD and LAG are not aggregates. These expressions return the value of 
an expression a number of rows before (LAG) or ahead (LEAD) of the current row. 
These expression put a constraint on the Window frame in which they are 
executed: this can only be a Row frame with equal offsets.
{quote}
I guess it depends on what "equal offsets" means. Does it mean that the offsets 
specified in both PRECEDING and FOLLOWING need to match? Or that the offsets 
need to match the one associated with the LEAD or LAG function? Based on 
experience, it seems to be the latter (offsets need to match LEAD or LAG's 
offsets).

E.g.,
{noformat}
scala> sql("select c, b, a, lead(a, 1) over(partition by c order by a ROWS 
BETWEEN 1 following AND 1 following) as a_avg from windowtest").show(1000)
+---+---+---+-----+
|  c|  b|  a|a_avg|
+---+---+---+-----+
|  1|  2|  1|   11|
|  1| 12| 11|   21|
|  1| 22| 21|   31|
|  1| 32| 31|   41|
|  1| 42| 41| null|
|  6|  7|  6|   16|
|  6| 17| 16|   26|
|  6| 27| 26|   36|
...etc...
{noformat}
And also
{noformat}
scala> sql("select c, b, a, lead(a, 2) over(partition by c order by a ROWS 
BETWEEN 2 following AND 2 following) as a_avg from windowtest").show(1000)
+---+---+---+-----+
|  c|  b|  a|a_avg|
+---+---+---+-----+
|  1|  2|  1|   21|
|  1| 12| 11|   31|
|  1| 22| 21|   41|
|  1| 32| 31| null|
|  1| 42| 41| null|
|  6|  7|  6|   26|
|  6| 17| 16|   36|
|  6| 27| 26|   46|
...etc...
{noformat}
But not the following:
{noformat}
scala> sql("select c, b, a, lead(a, 1) over(partition by c order by a ROWS 
BETWEEN 2 following AND 2 following) as a_avg from windowtest").show(1000)
org.apache.spark.sql.AnalysisException: Window Frame 
specifiedwindowframe(RowFrame, 2, 2) must match the required frame 
specifiedwindowframe(RowFrame, 1, 1);
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:43)
{noformat}
I suppose [~hvanhovell] or [~yhuai] would understand better (having implemented 
the native versions of these functions).

> LEAD function with 'ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING' 
> causes exception in Spark
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27466
>                 URL: https://issues.apache.org/jira/browse/SPARK-27466
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell
>    Affects Versions: 2.2.0
>         Environment: Spark version 2.2.0.2.6.4.92-2
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
>            Reporter: Zoltan
>            Priority: Major
>
> *1. Create a table in Hive:*
>   
> {code:java}
>  CREATE TABLE tab1(
>    col1 varchar(1),
>    col2 varchar(1)
>   )
>  PARTITIONED BY (
>    col3 varchar(1)
>  )
>  LOCATION
>    'hdfs://server1/data/tab1'
> {code}
>  
>  *2. Query the Table in Spark:*
> *2.1: Simple query, no exception thrown:*
> {code:java}
> scala> spark.sql("SELECT * from schema1.tab1").show()
> +-----+---++----
> |col1|col2|col3|
> +-----+---++----
> +-----+---++----
> {code}
> *2.2.: Query causing exception:*
> {code:java}
> scala> spark.sql("*SELECT (LEAD(col1) OVER ( PARTITION BY col3 ORDER BY col1 
> ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING*)) from 
> schema1.tab1")
> {code}
> {code:java}
> org.apache.spark.sql.AnalysisException: Window Frame ROWS BETWEEN UNBOUNDED 
> PRECEDING AND UNBOUNDED FOLLOWING must match the required frame ROWS BETWEEN 
> 1 FOLLOWING AND 1 FOLLOWING;
>    at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
>    at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
>    at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30$$anonfun$applyOrElse$11.applyOrElse(Analyzer.scala:2219)
>    at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30$$anonfun$applyOrElse$11.applyOrElse(Analyzer.scala:2215)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>    at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>    at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)
>    at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)
>    at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)
>    at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)
>    at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)
>    at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>    at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>    at scala.collection.immutable.List.foreach(List.scala:381)
>    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>    at scala.collection.immutable.List.map(List.scala:285)
>    at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)
>    at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>    at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)
>    at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:258)
>    at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:249)
>    at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30.applyOrElse(Analyzer.scala:2215)
>    at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$$anonfun$apply$30.applyOrElse(Analyzer.scala:2214)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>    at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>    at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>    at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$.apply(Analyzer.scala:2214)
>    at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame$.apply(Analyzer.scala:2213)
>    at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>    at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>    at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>    at scala.collection.immutable.List.foldLeft(List.scala:84)
>    at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
>    at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
>    at scala.collection.immutable.List.foreach(List.scala:381)
>    at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
>    at 
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
>    at 
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
>    at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
>    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
>    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:637)
>    ... 48 elided
> {code}
>  
> *3. The same query in Hive, no exception:*
>   
> {code:java}
>  Beeline version 1.2.1000.2.6.4.92-2 by Apache Hive
>  0: jdbc:hive2://server1> SELECT (LEAD(col1) OVER ( PARTITION BY col3 ORDER 
> BY col1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) from 
> schema1.tab1;
>  INFO  : Tez session hasn't been created yet. Opening session
>  INFO  : Dag name: SELECT (LEAD(col1) OV...schema1.tab1(Stage-1)
>  INFO  : Status: Running (Executing on YARN cluster with App id 
> application_1554824808741_2080)
> --------------------------------------------------------------------------------
>          VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  
> KILLED
>  
> --------------------------------------------------------------------------------
>  Map 1              SUCCEEDED      0          0        0        0       0     
>   0
>  Reducer 2 ......   SUCCEEDED      2          2        0        0       0     
>   0
>  
> --------------------------------------------------------------------------------
>  VERTICES: 01/02  [==========================>>] 100%  ELAPSED TIME: 3.99 s
>  
> --------------------------------------------------------------------------------
>  lead_window_0
>  No rows selected (13.108 seconds)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to