This should be a bug, go ahead to open a JIRA for it, thanks! On Tue, Aug 11, 2015 at 6:41 AM, Maciej Szymkiewicz <mszymkiew...@gmail.com> wrote: > Hello everyone, > > I am trying to use PySpark API with window functions without specifying > partition clause. I mean something equivalent to this > > SELECT v, row_number() OVER (ORDER BY v) AS rn FROM df > > in SQL. I am not sure if I am doing something wrong or it is a bug but > results are far from what I expect. Lets assume we have data as follows: > > from pyspark.sql.window import Window > from pyspark.sql import functions as f > > df = sqlContext.createDataFrame( > zip(["foo"] * 5 + ["bar"] * 5, range(1, 6) + range(6, 11)), > ("k", "v") > ).withColumn("dummy", f.lit(1)) > > df.registerTempTable("df") > df.show() > > +---+--+-----+ > | k| v|dummy| > +---+--+-----+ > |foo| 1| 1| > |foo| 2| 1| > |foo| 3| 1| > |foo| 4| 1| > |foo| 5| 1| > |bar| 6| 1| > |bar| 7| 1| > |bar| 8| 1| > |bar| 9| 1| > |bar|10| 1| > +---+--+-----+ > > When I use following SQL query > > sql_ord = """SELECT k, v, row_number() OVER ( > ORDER BY v > ) AS rn FROM df""" > > sqlContext.sql(sql_ord).show() > > I get expected results: > > +---+--+--+ > | k| v|rn| > +---+--+--+ > |foo| 1| 1| > |foo| 2| 2| > |foo| 3| 3| > |foo| 4| 4| > |foo| 5| 5| > |bar| 6| 6| > |bar| 7| 7| > |bar| 8| 8| > |bar| 9| 9| > |bar|10|10| > +---+--+--+ > > but when I try to define a similar thing using Python API > > w_ord = Window.orderBy("v") > df.select("k", "v", f.rowNumber().over(w_ord).alias("avg")).show() > > I get results like this: > > +---+--+---+ > | k| v|avg| > +---+--+---+ > |foo| 1| 1| > |foo| 2| 1| > |foo| 3| 1| > |foo| 4| 1| > |foo| 5| 1| > |bar| 6| 1| > |bar| 7| 1| > |bar| 8| 1| > |bar| 9| 1| > |bar|10| 1| > +---+--+---+ > > When I specify both partition on order > > w_part_ord = Window.partitionBy("dummy").orderBy("v") > df.select("k", "v", f.rowNumber().over(w_part_ord).alias("avg")).show() > > everything works as I expect: > > +---+--+---+ > | k| v|avg| > +---+--+---+ > |foo| 1| 1| > |foo| 2| 2| > |foo| 3| 3| > |foo| 4| 4| > |foo| 5| 5| > |bar| 6| 6| > |bar| 7| 7| > |bar| 8| 8| > |bar| 9| 9| > |bar|10| 10| > +---+--+---+ > > Another example of similar behavior with correct SQL result: > > sql_ord_rng = """SELECT k, v, avg(v) OVER ( > ORDER BY v > ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING > ) AS avg FROM df""" > sqlContext.sql(sql_ord_rng).show() > > +---+--+---+ > | k| v|avg| > +---+--+---+ > |foo| 1|1.5| > |foo| 2|2.0| > |foo| 3|3.0| > |foo| 4|4.0| > |foo| 5|5.0| > |bar| 6|6.0| > |bar| 7|7.0| > |bar| 8|8.0| > |bar| 9|9.0| > |bar|10|9.5| > +---+--+---+ > > and not incorrect PySpark > > w_ord_rng = Window.orderBy("v").rowsBetween(-1, 1) > df.select("k", "v", f.avg("v").over(w_ord_rng).alias("avg")).show() > > +---+--+----+ > | k| v| avg| > +---+--+----+ > |foo| 1| 1.0| > |foo| 2| 2.0| > |foo| 3| 3.0| > |foo| 4| 4.0| > |foo| 5| 5.0| > |bar| 6| 6.0| > |bar| 7| 7.0| > |bar| 8| 8.0| > |bar| 9| 9.0| > |bar|10|10.0| > +---+--+----+ > > Same as before adding dummy partitions solves the problem: > > w_part_ord_rng = > Window.partitionBy("dummy").orderBy("v").rowsBetween(-1, 1) > df.select("k", "v", f.avg("v").over(w_part_ord_rng).alias("avg")).show() > > +---+--+---+ > | k| v|avg| > +---+--+---+ > |foo| 1|1.5| > |foo| 2|2.0| > |foo| 3|3.0| > |foo| 4|4.0| > |foo| 5|5.0| > |bar| 6|6.0| > |bar| 7|7.0| > |bar| 8|8.0| > |bar| 9|9.0| > |bar|10|9.5| > +---+--+---+ > > I've checked window functions tests > (https://github.com/apache/spark/blob/ac507a03c3371cd5404ca195ee0ba0306badfc23/python/pyspark/sql/tests.py#L1105) > but these cover only partition + order case. > > Is there something wrong with my window definitions or should I open > Jira issue? > > Environment: > > - Debian GNU/Linux > - Spark 1.4.1 > - Python 2.7.9 > - OpenJDK Runtime Environment (IcedTea 2.5.5) (7u79-2.5.5-1~deb8u1) > > -- > Best, > Maciej > >
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org