Hi,

Excellent, the sliding method seems to be just what I'm looking for.  Hope
it becomes part of the stable API, I'd assume there to be lots of uses with
time-related data.

Dylan's suggestion seems reasonable as well, if DeveloperApi is not an
option.

Thanks!


Best regards,

*    Sampo Niskanen*

*Lead developer / Wellmo*
    sampo.niska...@wellmo.com
    +358 40 820 5291


On Thu, Oct 22, 2015 at 3:51 PM, Andrianasolo Fanilo <
fanilo.andrianas...@worldline.com> wrote:

> Hi Sampo,
>
>
>
> There is a sliding method you could try inside the 
> org.apache.spark.mllib.rdd.RDDFunctions package, though it’s DeveloperApi 
> stuff 
> (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions)
>
>
>
> *import *org.apache.spark.{SparkConf, SparkContext}
> *import *org.apache.spark.mllib.rdd.RDDFunctions._
>
> *object *Test {
>
>   *def *main(args: Array[String]): Unit = {
>     *val *sparkConf = *new *SparkConf()
>     sparkConf.setMaster(*"local"*).setAppName(*"sandbox"*)
>
>     *val *sc = *new *SparkContext(sparkConf)
>
>     *val *arr = *Array*((1, *"A"*), (8, *"D"*), (7, *"C"*), (3, *"B"*), (9, 
> *"E"*))
>     *val *rdd = sc.parallelize(arr)
>     *val *sorted = rdd.sortByKey(*true*)
>
>     print(sorted.sliding(2).map(x => (x(0), x(1))).collect().toSeq)
>
>
>     sc.stop()
>   }
> }
>
>
>
> prints
>
>
>
> WrappedArray(((1,A),(3,B)), ((3,B),(7,C)), ((7,C),(8,D)), ((8,D),(9,E)))
>
>
>
> Otherwise you could try to convert your RDD to a DataFrame then use windowing 
> functions in SparkSQL with the LEAD/LAG functions.
>
>
>
> Best regards,
>
> Fanilo
>
>
>
>
>
> *De :* Dylan Hogg [mailto:dylanh...@gmail.com]
> *Envoyé :* jeudi 22 octobre 2015 13:44
> *À :* Sampo Niskanen
> *Cc :* user
> *Objet :* Re: Analyzing consecutive elements
>
>
>
> Hi Sampo,
>
> You could try zipWithIndex followed by a self join with shifted index
> values like this:
>
> val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
> val rdd = sc.parallelize(arr)
> val sorted = rdd.sortByKey(true)
>
> val zipped = sorted.zipWithIndex.map(x => (x._2, x._1))
> val pairs = zipped.join(zipped.map(x => (x._1 - 1, x._2))).sortBy(_._1)
>
> Which produces the consecutive elements as pairs in the RDD for further
> processing:
> (0,((1,A),(3,B)))
> (1,((3,B),(7,C)))
> (2,((7,C),(8,D)))
> (3,((8,D),(9,E)))
>
> There are probably more efficient ways to do this, but if your dataset
> isn't too big it should work for you.
>
> Cheers,
>
> Dylan.
>
>
>
>
>
> On 22 October 2015 at 17:35, Sampo Niskanen <sampo.niska...@wellmo.com>
> wrote:
>
> Hi,
>
>
>
> I have analytics data with timestamps on each element.  I'd like to
> analyze consecutive elements using Spark, but haven't figured out how to do
> this.
>
>
>
> Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
> to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
> time-related elements.)
>
>
>
> How can this be achieved?
>
>
>
>
> *    Sampo Niskanen*
> *    Lead developer / Wellmo*
>
>     sampo.niska...@wellmo.com
>     +358 40 820 5291
>
>
>
>
> ------------------------------
>
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
>
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, the Worldline liability
> cannot be triggered for the message content. Although the sender endeavours
> to maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.
>

Reply via email to