Analyzing consecutive elements

2015-10-22 Thread Sampo Niskanen
Hi,

I have analytics data with timestamps on each element.  I'd like to analyze
consecutive elements using Spark, but haven't figured out how to do this.

Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
time-related elements.)

How can this be achieved?


*Sampo Niskanen*

*Lead developer / Wellmo*
sampo.niska...@wellmo.com
+358 40 820 5291


Re: Analyzing consecutive elements

2015-10-22 Thread Dylan Hogg
Hi Sampo,

You could try zipWithIndex followed by a self join with shifted index
values like this:

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)

val zipped = sorted.zipWithIndex.map(x => (x._2, x._1))
val pairs = zipped.join(zipped.map(x => (x._1 - 1, x._2))).sortBy(_._1)

Which produces the consecutive elements as pairs in the RDD for further
processing:
(0,((1,A),(3,B)))
(1,((3,B),(7,C)))
(2,((7,C),(8,D)))
(3,((8,D),(9,E)))

There are probably more efficient ways to do this, but if your dataset
isn't too big it should work for you.

Cheers,
Dylan.


On 22 October 2015 at 17:35, Sampo Niskanen 
wrote:

> Hi,
>
> I have analytics data with timestamps on each element.  I'd like to
> analyze consecutive elements using Spark, but haven't figured out how to do
> this.
>
> Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
> to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
> time-related elements.)
>
> How can this be achieved?
>
>
> *Sampo Niskanen*
>
> *Lead developer / Wellmo*
> sampo.niska...@wellmo.com
> +358 40 820 5291
>
>


Re: Analyzing consecutive elements

2015-10-22 Thread Sampo Niskanen
Hi,

Sorry, I'm not very familiar with those methods and cannot find the 'drop'
method anywhere.

As an example:

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)
// ... then what?


Thanks.


Best regards,

*Sampo Niskanen*

*Lead developer / Wellmo*
sampo.niska...@wellmo.com
+358 40 820 5291


On Thu, Oct 22, 2015 at 10:43 AM, Adrian Tanase  wrote:

> I'm not sure if there is a better way to do it directly using Spark APIs
> but I would try to use mapPartitions and then within each partition
> Iterable to:
>
> rdd.zip(rdd.drop(1)) - using the Scala collection APIs
>
> This should give you what you need inside a partition. I'm hoping that you
> can partition your data somehow (e.g by user id or session id) that makes
> you algorithm parallel. In that case you can use the snippet above in a
> reduceByKey.
>
> hope this helps
> -adrian
>
> Sent from my iPhone
>
> On 22 Oct 2015, at 09:36, Sampo Niskanen 
> wrote:
>
> Hi,
>
> I have analytics data with timestamps on each element.  I'd like to
> analyze consecutive elements using Spark, but haven't figured out how to do
> this.
>
> Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
> to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
> time-related elements.)
>
> How can this be achieved?
>
>
> *Sampo Niskanen*
>
> *Lead developer / Wellmo *
> sampo.niska...@wellmo.com
> +358 40 820 5291
>
>
>


Re: Analyzing consecutive elements

2015-10-22 Thread Adrian Tanase
Drop is a method on scala’s collections (array, list, etc) - not on Spark’s 
RDDs. You can look at it as long as you use mapPartitions or something like 
reduceByKey, but it totally depends on the use-cases you have for analytics.

The others have suggested better solutions using only spark’s APIs.

-adrian

From: Sampo Niskanen
Date: Thursday, October 22, 2015 at 2:12 PM
To: Adrian Tanase
Cc: user
Subject: Re: Analyzing consecutive elements

Hi,

Sorry, I'm not very familiar with those methods and cannot find the 'drop' 
method anywhere.

As an example:

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)
// ... then what?


Thanks.


Best regards,

Sampo Niskanen
Lead developer / Wellmo

sampo.niska...@wellmo.com<mailto:sampo.niska...@wellmo.com>
+358 40 820 5291


On Thu, Oct 22, 2015 at 10:43 AM, Adrian Tanase 
<atan...@adobe.com<mailto:atan...@adobe.com>> wrote:
I'm not sure if there is a better way to do it directly using Spark APIs but I 
would try to use mapPartitions and then within each partition Iterable to:

rdd.zip(rdd.drop(1)) - using the Scala collection APIs

This should give you what you need inside a partition. I'm hoping that you can 
partition your data somehow (e.g by user id or session id) that makes you 
algorithm parallel. In that case you can use the snippet above in a reduceByKey.

hope this helps
-adrian

Sent from my iPhone

On 22 Oct 2015, at 09:36, Sampo Niskanen 
<sampo.niska...@wellmo.com<mailto:sampo.niska...@wellmo.com>> wrote:

Hi,

I have analytics data with timestamps on each element.  I'd like to analyze 
consecutive elements using Spark, but haven't figured out how to do this.

Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E] to 
an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze 
time-related elements.)

How can this be achieved?


Sampo Niskanen
Lead developer / Wellmo

sampo.niska...@wellmo.com<mailto:sampo.niska...@wellmo.com>
+358 40 820 5291




RE: Analyzing consecutive elements

2015-10-22 Thread Andrianasolo Fanilo
Hi Sampo,


There is a sliding method you could try inside the 
org.apache.spark.mllib.rdd.RDDFunctions package, though it’s DeveloperApi stuff 
(https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions)



import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.rdd.RDDFunctions._

object Test {

  def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local").setAppName("sandbox")

val sc = new SparkContext(sparkConf)

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)

print(sorted.sliding(2).map(x => (x(0), x(1))).collect().toSeq)

sc.stop()
  }
}



prints



WrappedArray(((1,A),(3,B)), ((3,B),(7,C)), ((7,C),(8,D)), ((8,D),(9,E)))



Otherwise you could try to convert your RDD to a DataFrame then use windowing 
functions in SparkSQL with the LEAD/LAG functions.



Best regards,

Fanilo



De : Dylan Hogg [mailto:dylanh...@gmail.com]
Envoyé : jeudi 22 octobre 2015 13:44
À : Sampo Niskanen
Cc : user
Objet : Re: Analyzing consecutive elements

Hi Sampo,
You could try zipWithIndex followed by a self join with shifted index values 
like this:

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)

val zipped = sorted.zipWithIndex.map(x => (x._2, x._1))
val pairs = zipped.join(zipped.map(x => (x._1 - 1, x._2))).sortBy(_._1)
Which produces the consecutive elements as pairs in the RDD for further 
processing:
(0,((1,A),(3,B)))
(1,((3,B),(7,C)))
(2,((7,C),(8,D)))
(3,((8,D),(9,E)))
There are probably more efficient ways to do this, but if your dataset isn't 
too big it should work for you.
Cheers,
Dylan.


On 22 October 2015 at 17:35, Sampo Niskanen 
<sampo.niska...@wellmo.com<mailto:sampo.niska...@wellmo.com>> wrote:
Hi,

I have analytics data with timestamps on each element.  I'd like to analyze 
consecutive elements using Spark, but haven't figured out how to do this.

Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E] to 
an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze 
time-related elements.)

How can this be achieved?


Sampo Niskanen
Lead developer / Wellmo

sampo.niska...@wellmo.com<mailto:sampo.niska...@wellmo.com>
+358 40 820 5291





Ce message et les pièces jointes sont confidentiels et réservés à l'usage 
exclusif de ses destinataires. Il peut également être protégé par le secret 
professionnel. Si vous recevez ce message par erreur, merci d'en avertir 
immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant 
être assurée sur Internet, la responsabilité de Worldline ne pourra être 
recherchée quant au contenu de ce message. Bien que les meilleurs efforts 
soient faits pour maintenir cette transmission exempte de tout virus, 
l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne 
saurait être recherchée pour tout dommage résultant d'un virus transmis.

This e-mail and the documents attached are confidential and intended solely for 
the addressee; it may also be privileged. If you receive this e-mail in error, 
please notify the sender immediately and destroy it. As its integrity cannot be 
secured on the Internet, the Worldline liability cannot be triggered for the 
message content. Although the sender endeavours to maintain a computer 
virus-free network, the sender does not warrant that this transmission is 
virus-free and will not be liable for any damages resulting from any virus 
transmitted.


Re: Analyzing consecutive elements

2015-10-22 Thread Sampo Niskanen
Hi,

Excellent, the sliding method seems to be just what I'm looking for.  Hope
it becomes part of the stable API, I'd assume there to be lots of uses with
time-related data.

Dylan's suggestion seems reasonable as well, if DeveloperApi is not an
option.

Thanks!


Best regards,

*Sampo Niskanen*

*Lead developer / Wellmo*
sampo.niska...@wellmo.com
+358 40 820 5291


On Thu, Oct 22, 2015 at 3:51 PM, Andrianasolo Fanilo <
fanilo.andrianas...@worldline.com> wrote:

> Hi Sampo,
>
>
>
> There is a sliding method you could try inside the 
> org.apache.spark.mllib.rdd.RDDFunctions package, though it’s DeveloperApi 
> stuff 
> (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions)
>
>
>
> *import *org.apache.spark.{SparkConf, SparkContext}
> *import *org.apache.spark.mllib.rdd.RDDFunctions._
>
> *object *Test {
>
>   *def *main(args: Array[String]): Unit = {
> *val *sparkConf = *new *SparkConf()
> sparkConf.setMaster(*"local"*).setAppName(*"sandbox"*)
>
> *val *sc = *new *SparkContext(sparkConf)
>
> *val *arr = *Array*((1, *"A"*), (8, *"D"*), (7, *"C"*), (3, *"B"*), (9, 
> *"E"*))
> *val *rdd = sc.parallelize(arr)
> *val *sorted = rdd.sortByKey(*true*)
>
> print(sorted.sliding(2).map(x => (x(0), x(1))).collect().toSeq)
>
>
> sc.stop()
>   }
> }
>
>
>
> prints
>
>
>
> WrappedArray(((1,A),(3,B)), ((3,B),(7,C)), ((7,C),(8,D)), ((8,D),(9,E)))
>
>
>
> Otherwise you could try to convert your RDD to a DataFrame then use windowing 
> functions in SparkSQL with the LEAD/LAG functions.
>
>
>
> Best regards,
>
> Fanilo
>
>
>
>
>
> *De :* Dylan Hogg [mailto:dylanh...@gmail.com]
> *Envoyé :* jeudi 22 octobre 2015 13:44
> *À :* Sampo Niskanen
> *Cc :* user
> *Objet :* Re: Analyzing consecutive elements
>
>
>
> Hi Sampo,
>
> You could try zipWithIndex followed by a self join with shifted index
> values like this:
>
> val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
> val rdd = sc.parallelize(arr)
> val sorted = rdd.sortByKey(true)
>
> val zipped = sorted.zipWithIndex.map(x => (x._2, x._1))
> val pairs = zipped.join(zipped.map(x => (x._1 - 1, x._2))).sortBy(_._1)
>
> Which produces the consecutive elements as pairs in the RDD for further
> processing:
> (0,((1,A),(3,B)))
> (1,((3,B),(7,C)))
> (2,((7,C),(8,D)))
> (3,((8,D),(9,E)))
>
> There are probably more efficient ways to do this, but if your dataset
> isn't too big it should work for you.
>
> Cheers,
>
> Dylan.
>
>
>
>
>
> On 22 October 2015 at 17:35, Sampo Niskanen <sampo.niska...@wellmo.com>
> wrote:
>
> Hi,
>
>
>
> I have analytics data with timestamps on each element.  I'd like to
> analyze consecutive elements using Spark, but haven't figured out how to do
> this.
>
>
>
> Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
> to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
> time-related elements.)
>
>
>
> How can this be achieved?
>
>
>
>
> *Sampo Niskanen*
> *Lead developer / Wellmo*
>
> sampo.niska...@wellmo.com
> +358 40 820 5291
>
>
>
>
> --
>
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
>
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, the Worldline liability
> cannot be triggered for the message content. Although the sender endeavours
> to maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.
>