Re: [SQL]Analysis failed when combining Window function and GROUP BY in Spark2.x

2017-03-08 Thread Herman van Hövell tot Westerflier
You are seeing a bug in the Hive parser. Hive drops the window clause when
it encounters a count(distinct ...). See
https://issues.apache.org/jira/browse/HIVE-10141 for more information.

Spark 1.6 plans this as a regular distinct aggregate (dropping the window
clause), which is wrong. Spark 2.x uses its own parser, and it does not
allow you to do use 'distinct' aggregates in window functions. You are
getting this error because aggregates are planned before a windows, and the
aggregate cannot find b in its grouping by expressions.

On Wed, Mar 8, 2017 at 5:21 AM, StanZhai  wrote:

> We can reproduce this using the following code:
>
> val spark = 
> SparkSession.builder().appName("test").master("local").getOrCreate()
>
> val sql1 =
>   """
> |create temporary view tb as select * from values
> |(1, 0),
> |(1, 0),
> |(2, 0)
> |as grouping(a, b)
>   """.stripMargin
>
> val sql =
>   """
> |select count(distinct(b)) over (partition by a) from tb group by a
>   """.stripMargin
>
> spark.sql(sql1)
> spark.sql(sql).show()
>
> It will throw exception like this:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 
> 'tb.`b`' is neither present in the group by, nor is it an aggregate function. 
> Add to group by or wrap in first() (or first_value) if you don't care which 
> value you get.;;
> Project [count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED 
> PRECEDING AND UNBOUNDED FOLLOWING)#4L]
> +- Project [b#1, a#0, count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN 
> UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L, count(DISTINCT b) OVER 
> (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L]
>+- Window [count(distinct b#1) windowspecdefinition(a#0, ROWS BETWEEN 
> UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS count(DISTINCT b) OVER 
> (PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
> FOLLOWING)#4L], [a#0]
>   +- Aggregate [a#0], [b#1, a#0]
>  +- SubqueryAlias tb
> +- Project [a#0, b#1]
>+- SubqueryAlias grouping
>   +- LocalRelation [a#0, b#1]
>
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:220)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:247)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:247)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:247)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
>   at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>
> But, there is no exception in Spark 1.6.x.
>
> I think the sql select count(distinct(b)) over (partition by a) from tb
> group by a should be executed. I've no idea about the exception. Is this
> in line with expectations?
>
> Any help is appreciated!
>
> Best,
>
> Stan
>
>
> -

Re:Re: [SQL]Analysis failed when combining Window function and GROUP BY in Spark2.x

2017-03-08 Thread StanZhai
Thanks for your reply!
I know what's going on now.






"Herman van Hövell tot Westerflier-2 [via Apache Spark Developers 
List]" wroted at 2017-03-08 21:35:

You are seeing a bug in the Hive parser. Hive drops the window clause when it 
encounters a count(distinct ...). See 
https://issues.apache.org/jira/browse/HIVE-10141 for more information.

Spark 1.6 plans this as a regular distinct aggregate (dropping the window 
clause), which is wrong. Spark 2.x uses its own parser, and it does not allow 
you to do use 'distinct' aggregates in window functions. You are getting this 
error because aggregates are planned before a windows, and the aggregate cannot 
find b in its grouping by expressions.


On Wed, Mar 8, 2017 at 5:21 AM, StanZhai <[hidden email]> wrote:
We can reproduce this using the following code:
val spark = SparkSession.builder().appName("test").master("local").getOrCreate()

val sql1 =
  """
|create temporary view tb as select * from values
|(1, 0),
|(1, 0),
|(2, 0)
|as grouping(a, b)
  """.stripMargin

val sql =
  """
|select count(distinct(b)) over (partition by a) from tb group by a
  """.stripMargin

spark.sql(sql1)
spark.sql(sql).show()It will throw exception like this:
Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 
'tb.`b`' is neither present in the group by, nor is it an aggregate function. 
Add to group by or wrap in first() (or first_value) if you don't care which 
value you get.;;
Project [count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING)#4L]
+- Project [b#1, a#0, count(DISTINCT b) OVER (PARTITION BY a ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L, count(DISTINCT b) OVER 
(PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L]
   +- Window [count(distinct b#1) windowspecdefinition(a#0, ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS count(DISTINCT b) OVER 
(PARTITION BY a ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#4L], 
[a#0]
  +- Aggregate [a#0], [b#1, a#0]
 +- SubqueryAlias tb
+- Project [a#0, b#1]
   +- SubqueryAlias grouping
  +- LocalRelation [a#0, b#1]

  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:220)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:247)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$7.apply(CheckAnalysis.scala:247)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:247)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
  at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)But, there is 
no exception in Spark 1.6.x.
I think the sql select count(distinct(b)) over (partition by a) from tb group 
by a should be executed. I've no idea about the exception. Is this in line with 
expectations?
Any help is appreciated!
Best, 
Stan


View this message in context: [SQL]Analysis failed w