Hi Julien,

Although this is a strange bug in Spark, it's rare to need more than
Integer max value size for a window.

Nevertheless, most of the window functions can be expressed with
self-joins. Hence, your problem may be solved with this example:

If input data as follow:

+---+-------------+-----+
| id|    timestamp|value|
+---+-------------+-----+
|  B|1000000000000|  100|
|  B|1001000000000|   50|
|  B|1002000000000|  200|
|  B|2500000000000|  500|
+---+-------------+-----+

And the window is  (-2000000000L, 0)

Then this code will give the wanted result:

df.as("df1").join(df.as("df2"),
  $"df2.timestamp" between($"df1.timestamp" - 2000000000L, $"df1.timestamp"))
  .groupBy($"df1.id", $"df1.timestamp", $"df1.value")
  .agg( functions.min($"df2.value").as("min___value"))
  .orderBy($"df1.timestamp")
  .show()

+---+-------------+-----+-----------+
| id|    timestamp|value|min___value|
+---+-------------+-----+-----------+
|  B|1000000000000|  100|        100|
|  B|1001000000000|   50|         50|
|  B|1002000000000|  200|         50|
|  B|2500000000000|  500|        500|
+---+-------------+-----+-----------+

Or by SparkSQL:

SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as
min___value FROM
(
  SELECT a.id as id, a.timestamp as timestamp, a.value as value,
b.timestamp as _timestamp, b.value as _value
  FROM df a CROSS JOIN df b
  ON b.timestamp >= a.timestamp - 2000000000L and b.timestamp <= a.timestamp
) c
GROUP BY c.id, c.timestamp, c.value ORDER BY c.timestamp


This must be also possible also on Spark Streaming however don't
expect high performance.


Cheers,
Radhwane



2017-07-05 10:41 GMT+02:00 Julien CHAMP <jch...@tellmeplus.com>:

> Hi there !
>
> Let me explain my problem to see if you have a good solution to help me :)
>
> Let's imagine that I have all my data in a DB or a file, that I load in a
> dataframe DF with the following columns :
> *id | timestamp(ms) | value*
> A | 1000000 |  100
> A | 1000010 |  50
> B | 1000000 |  100
> B | 1000010 |  50
> B | 1000020 |  200
> B | 2500000 |  500
> C | 1000000 |  200
> C | 1000010 |  500
>
> The timestamp is a *long value*, so as to be able to express date in ms
> from 0000-01-01 to today !
>
> I want to compute operations such as min, max, average on the *value
> column*, for a given window function, and grouped by id ( Bonus :  if
> possible for only some timestamps... )
>
> For example if I have 3 tuples :
>
> id | timestamp(ms) | value
> B | 1000000 |  100
> B | 1000010 |  50
> B | 1000020 |  200
> B | 2500000 |  500
>
> I would like to be able to compute the min value for windows of time = 20.
> This would result in such a DF :
>
> id | timestamp(ms) | value | min___value
> B | 1000000 |  100 | 100
> B | 1000010 |  50  | 50
> B | 1000020 |  200 | 50
> B | 2500000 |  500 | 500
>
> This seems the perfect use case for window function in spark  ( cf :
> https://databricks.com/blog/2015/07/15/introducing-window-
> functions-in-spark-sql.html )
> I can use :
>
> Window.orderBy("timestamp").partitionBy("id").rangeBetween(-20,0)
> df.withColumn("min___value", min(df.col("value")).over(tw))
>
> This leads to the perfect answer !
>
> However, there is a big bug with window functions as reported here (
> https://issues.apache.org/jira/browse/SPARK-19451 ) when working with
> Long values !!! So I can't use this....
>
> So my question is ( of course ) how can I resolve my problem ?
> If I use spark streaming I will face the same issue ?
>
> I'll be glad to discuss this problem with you, feel free to answer :)
>
> Regards,
>
> Julien
> --
>
>
> Julien CHAMP — Data Scientist
>
>
> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : 
> **jch...@tellmeplus.com
> <jch...@tellmeplus.com>*
>
> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
> <https://www.linkedin.com/in/julienchamp>
>
> TellMePlus S.A — Predictive Objects
>
> *Paris* : 7 rue des Pommerots, 78400 Chatou
> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière
>
>
> Ce message peut contenir des informations confidentielles ou couvertes par
> le secret professionnel, à l’intention de son destinataire. Si vous n’en
> êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer
> toute copie.
> This email may contain confidential and/or privileged information for the
> intended recipient. If you are not the intended recipient, please contact
> the sender and delete all copies.
>
>
> <http://www.tellmeplus.com/assets/emailing/banner.html>




-- 

[image: photo] Radhwane Chebaane
Distributed systems engineer, Mindlytix

Mail: radhw...@mindlytix.com  <radhw...@mindlytix.com>
Mobile: +33 695 588 906 <+33+695+588+906>
<https://mail.google.com/mail/u/0/#>
Skype: rad.cheb  <https://mail.google.com/mail/u/0/#>
LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b>
<https://mail.google.com/mail/u/0/#>

Reply via email to