Hello all,

I'm experimenting with Spark 1.4.1 window functions
and have come to a problem in pySpark that I've described in a Stackoverflow
question
<http://stackoverflow.com/questions/32376713/spark-window-functions-dont-work-as-expected>

In essence, the

wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank")).collect()
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
func.lead(df.b,1).over(wSpec).alias("next")).collect()

does not work in pySpark: exception for the first collect() and None output
from window function in the second collect().

While the same example in Spark/Scala works fine:

val wSpec = Window.orderBy("a")
df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"),
lead(df("b"),1).over(wSpec).alias("next"))

Am I doing anything wrong or this is a pySpark issue indeed?


Best Regards,
Sergey

PS: Here is the full pySpark shell example:

from pyspark.sql.window import Window
import pyspark.sql.functions as func

l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
df = sqlContext.createDataFrame(l,["a","b"])
wSpec = Window.orderBy(df.a).rowsBetween(-1,1)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
# ==> Failure org.apache.spark.sql.AnalysisException: Window function rank
does not take a frame specification.
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
func.lead(df.b,1).over(wSpec).alias("next"))
# ===>  org.apache.spark.sql.AnalysisException: Window function lag does
not take a frame specification.;


wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
# ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more
arguments are expected.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
func.lead(df.b,1).over(wSpec).alias("next")).collect()
# [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202,
next=None), Row(a=3, prev=None, b=303, next=None)]

Reply via email to