Re: FlatMapValues

2015-01-05 Thread Sanjay Subramanian
cool let me adapt that. thanks a tonregardssanjay
  From: Sean Owen 
 To: Sanjay Subramanian  
Cc: "user@spark.apache.org"  
 Sent: Monday, January 5, 2015 3:19 AM
 Subject: Re: FlatMapValues
   
For the record, the solution I was suggesting was about like this:

inputRDD.flatMap { input =>
  val tokens = input.split(',')
  val id = tokens(0)
  val keyValuePairs = tokens.tail.grouped(2)
  val keys = keyValuePairs.map(_(0))
  keys.map(key => (id, key))
}

This is much more efficient.



On Wed, Dec 31, 2014 at 3:46 PM, Sean Owen  wrote:
> From the clarification below, the problem is that you are calling
> flatMapValues, which is only available on an RDD of key-value tuples.
> Your map function returns a tuple in one case but a String in the
> other, so your RDD is a bunch of Any, which is not at all what you
> want. You need to return a tuple in both cases, which is what Kapil
> pointed out.
>
> However it's still not quite what you want. Your input is basically
> [key value1 value2 value3] so you want to flatMap that to (key,value1)
> (key,value2) (key,value3). flatMapValues does not come into play.
>
> On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian
>  wrote:
>> My understanding is as follows
>>
>> STEP 1 (This would create a pair RDD)
>> ===
>>
>> reacRdd.map(line => line.split(',')).map(fields => {
>>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>>
>> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>>  }
>>  else {
>>    ""
>>  }
>>  })
>>
>> STEP 2
>> ===
>> Since previous step created a pair RDD, I thought flatMapValues method will
>> be applicable.
>> But the code does not even compile saying that flatMapValues is not
>> applicable to RDD :-(
>>
>>
>> reacRdd.map(line => line.split(',')).map(fields => {
>>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>>
>> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>>  }
>>  else {
>>    ""
>>  }
>>  }).flatMapValues(skus =>
>> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>>
>>
>> SUMMARY
>> ===
>> when a dataset looks like the following
>>
>> 1,red,blue,green
>> 2,yellow,violet,pink
>>
>> I want to output the following and I am asking how do I do that ? Perhaps my
>> code is 100% wrong. Please correct me and educate me :-)
>>
>> 1,red
>> 1,blue
>> 1,green
>> 2,yellow
>> 2,violet
>> 2,pink


  

Re: FlatMapValues

2015-01-05 Thread Sean Owen
For the record, the solution I was suggesting was about like this:

inputRDD.flatMap { input =>
  val tokens = input.split(',')
  val id = tokens(0)
  val keyValuePairs = tokens.tail.grouped(2)
  val keys = keyValuePairs.map(_(0))
  keys.map(key => (id, key))
}

This is much more efficient.

On Wed, Dec 31, 2014 at 3:46 PM, Sean Owen  wrote:
> From the clarification below, the problem is that you are calling
> flatMapValues, which is only available on an RDD of key-value tuples.
> Your map function returns a tuple in one case but a String in the
> other, so your RDD is a bunch of Any, which is not at all what you
> want. You need to return a tuple in both cases, which is what Kapil
> pointed out.
>
> However it's still not quite what you want. Your input is basically
> [key value1 value2 value3] so you want to flatMap that to (key,value1)
> (key,value2) (key,value3). flatMapValues does not come into play.
>
> On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian
>  wrote:
>> My understanding is as follows
>>
>> STEP 1 (This would create a pair RDD)
>> ===
>>
>> reacRdd.map(line => line.split(',')).map(fields => {
>>   if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>>
>> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>>   }
>>   else {
>> ""
>>   }
>>   })
>>
>> STEP 2
>> ===
>> Since previous step created a pair RDD, I thought flatMapValues method will
>> be applicable.
>> But the code does not even compile saying that flatMapValues is not
>> applicable to RDD :-(
>>
>>
>> reacRdd.map(line => line.split(',')).map(fields => {
>>   if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>>
>> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>>   }
>>   else {
>> ""
>>   }
>>   }).flatMapValues(skus =>
>> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>>
>>
>> SUMMARY
>> ===
>> when a dataset looks like the following
>>
>> 1,red,blue,green
>> 2,yellow,violet,pink
>>
>> I want to output the following and I am asking how do I do that ? Perhaps my
>> code is 100% wrong. Please correct me and educate me :-)
>>
>> 1,red
>> 1,blue
>> 1,green
>> 2,yellow
>> 2,violet
>> 2,pink

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FlatMapValues

2015-01-02 Thread Sanjay Subramanian
OK this is how I solved it. Not elegant at all but works and I need to move 
ahead at this time.Converting to pair RDD is now not required.
reacRdd.map(line => line.split(',')).map(fields => {
  if (fields.length >= 10 && !fields(0).contains("VAERS_ID")) {
((fields(0)+","+fields(1)+"\t"+fields(0)+","+fields(3)+"\t"+fields(0)+","+fields(5)+"\t"+fields(0)+","+fields(7)+"\t"+fields(0)+","+fields(9)))
  }
  else {
("")
  }
}).flatMap(str => str.split('\t')).filter(line => 
line.toString.length() > 0).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)

  From: Sanjay Subramanian 
 To: Hitesh Khamesra  
Cc: Kapil Malik ; Sean Owen ; 
"user@spark.apache.org"  
 Sent: Thursday, January 1, 2015 12:39 PM
 Subject: Re: FlatMapValues
   
thanks let me try that out
 

     From: Hitesh Khamesra 
 To: Sanjay Subramanian  
Cc: Kapil Malik ; Sean Owen ; 
"user@spark.apache.org"  
 Sent: Thursday, January 1, 2015 9:46 AM
 Subject: Re: FlatMapValues
   
How about this..apply flatmap on per line. And in that function, parse each 
line and return all the colums as per your need.


On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian 
 wrote:

hey guys
Some of u may care :-) but this is just give u a background with where I am 
going with this. I have an IOS medical side effects app called MedicalSideFx. I 
built the entire underlying data layer aggregation using hadoop and currently 
the search is based on lucene. I am re-architecting the data layer by replacing 
hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines 
sidefx data.     
  @Kapil , sorry but flatMapValues is being reported as undefined
To give u a complete picture of the code (its inside IntelliJ but thats only 
for testingthe real code runs on sparkshell on my cluster)
https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala

If u were to assume dataset as 
025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10
025005,Arthritis,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10

This present version of the code, the flatMap works but only gives me values 
DeliriumHypokinesiaHypotonia
ArthritisInjection site oedemaInjection site reaction


What I need is
025003,Delirium
025003,Hypokinesia025003,Hypotonia025005,Arthritis
025005,Injection site oedema025005,Injection site reaction

thanks
sanjay
  From: Kapil Malik 
 To: Sean Owen ; Sanjay Subramanian 
 
Cc: "user@spark.apache.org"  
 Sent: Wednesday, December 31, 2014 9:35 AM
 Subject: RE: FlatMapValues
   
Hi Sanjay,

Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to 
import org.apache.spark.rdd.SparkContext._ to use them 
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
 )

@Sean, yes indeed flatMap / flatMapValues both can be used.

Regards,

Kapil 



-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 31 December 2014 21:16
To: Sanjay Subramanian
Cc: user@spark.apache.org
Subject: Re: FlatMapValues

>From the clarification below, the problem is that you are calling 
>flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the other, so 
your RDD is a bunch of Any, which is not at all what you want. You need to 
return a tuple in both cases, which is what Kapil pointed out.

However it's still not quite what you want. Your input is basically [key value1 
value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian 
 wrote:
> My understanding is as follows
>
> STEP 1 (This would create a pair RDD)
> ===
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  })
>
> STEP 2
> ===
> Since previous step created a pair RDD, I thought flatMapValues method 
> will be applicable.
> But the code does not even compile saying that flatMapValues is not 
> applicable to RDD :-(
>
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+f

Re: FlatMapValues

2015-01-01 Thread Sanjay Subramanian
thanks let me try that out
  From: Hitesh Khamesra 
 To: Sanjay Subramanian  
Cc: Kapil Malik ; Sean Owen ; 
"user@spark.apache.org"  
 Sent: Thursday, January 1, 2015 9:46 AM
 Subject: Re: FlatMapValues
   
How about this..apply flatmap on per line. And in that function, parse each 
line and return all the colums as per your need.


On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian 
 wrote:

hey guys
Some of u may care :-) but this is just give u a background with where I am 
going with this. I have an IOS medical side effects app called MedicalSideFx. I 
built the entire underlying data layer aggregation using hadoop and currently 
the search is based on lucene. I am re-architecting the data layer by replacing 
hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines 
sidefx data.     
  @Kapil , sorry but flatMapValues is being reported as undefined
To give u a complete picture of the code (its inside IntelliJ but thats only 
for testingthe real code runs on sparkshell on my cluster)
https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala

If u were to assume dataset as 
025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10
025005,Arthritis,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10

This present version of the code, the flatMap works but only gives me values 
DeliriumHypokinesiaHypotonia
ArthritisInjection site oedemaInjection site reaction


What I need is
025003,Delirium
025003,Hypokinesia025003,Hypotonia025005,Arthritis
025005,Injection site oedema025005,Injection site reaction

thanks
sanjay
  From: Kapil Malik 
 To: Sean Owen ; Sanjay Subramanian 
 
Cc: "user@spark.apache.org"  
 Sent: Wednesday, December 31, 2014 9:35 AM
 Subject: RE: FlatMapValues
   
Hi Sanjay,

Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to 
import org.apache.spark.rdd.SparkContext._ to use them 
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
 )

@Sean, yes indeed flatMap / flatMapValues both can be used.

Regards,

Kapil 



-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 31 December 2014 21:16
To: Sanjay Subramanian
Cc: user@spark.apache.org
Subject: Re: FlatMapValues

>From the clarification below, the problem is that you are calling 
>flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the other, so 
your RDD is a bunch of Any, which is not at all what you want. You need to 
return a tuple in both cases, which is what Kapil pointed out.

However it's still not quite what you want. Your input is basically [key value1 
value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian 
 wrote:
> My understanding is as follows
>
> STEP 1 (This would create a pair RDD)
> ===
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  })
>
> STEP 2
> ===
> Since previous step created a pair RDD, I thought flatMapValues method 
> will be applicable.
> But the code does not even compile saying that flatMapValues is not 
> applicable to RDD :-(
>
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  }).flatMapValues(skus =>
> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>
>
> SUMMARY
> ===
> when a dataset looks like the following
>
> 1,red,blue,green
> 2,yellow,violet,pink
>
> I want to output the following and I am asking how do I do that ? 
> Perhaps my code is 100% wrong. Please correct me and educate me :-)
>
> 1,red
> 1,blue
> 1,green
> 2,yellow
> 2,violet
> 2,pink

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



   



  

Re: FlatMapValues

2015-01-01 Thread Hitesh Khamesra
How about this..apply flatmap on per line. And in that function, parse each
line and return all the colums as per your need.

On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian <
sanjaysubraman...@yahoo.com.invalid> wrote:

> hey guys
>
> Some of u may care :-) but this is just give u a background with where I
> am going with this. I have an IOS medical side effects app called
> MedicalSideFx. I built the entire underlying data layer aggregation using
> hadoop and currently the search is based on lucene. I am re-architecting
> the data layer by replacing hadoop with Spark and integrating FDA data,
> Canadian sidefx data and vaccines sidefx data.
>
>
> @Kapil , sorry but flatMapValues is being reported as undefined
>
> To give u a complete picture of the code (its inside IntelliJ but thats
> only for testingthe real code runs on sparkshell on my cluster)
>
>
> https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala
>
> If u were to assume dataset as
>
> 025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10
> 025005,Arthritis,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10
>
> This present version of the code, the flatMap works but only gives me
> values
> Delirium
> Hypokinesia
> Hypotonia
> Arthritis
> Injection site oedema
> Injection site reaction
>
>
> What I need is
>
> 025003,Delirium
> 025003,Hypokinesia
> 025003,Hypotonia
> 025005,Arthritis
> 025005,Injection site oedema
> 025005,Injection site reaction
>
>
> thanks
>
> sanjay
>
>   --
>  *From:* Kapil Malik 
> *To:* Sean Owen ; Sanjay Subramanian <
> sanjaysubraman...@yahoo.com>
> *Cc:* "user@spark.apache.org" 
> *Sent:* Wednesday, December 31, 2014 9:35 AM
> *Subject:* RE: FlatMapValues
>
> Hi Sanjay,
>
> Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need
> to import org.apache.spark.rdd.SparkContext._ to use them 
> (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
> )
>
> @Sean, yes indeed flatMap / flatMapValues both can be used.
>
> Regards,
>
> Kapil
>
>
>
> -Original Message-
> From: Sean Owen [mailto:so...@cloudera.com]
> Sent: 31 December 2014 21:16
> To: Sanjay Subramanian
> Cc: user@spark.apache.org
> Subject: Re: FlatMapValues
>
> From the clarification below, the problem is that you are calling
> flatMapValues, which is only available on an RDD of key-value tuples.
> Your map function returns a tuple in one case but a String in the other,
> so your RDD is a bunch of Any, which is not at all what you want. You need
> to return a tuple in both cases, which is what Kapil pointed out.
>
> However it's still not quite what you want. Your input is basically [key
> value1 value2 value3] so you want to flatMap that to (key,value1)
> (key,value2) (key,value3). flatMapValues does not come into play.
>
> On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian <
> sanjaysubraman...@yahoo.com> wrote:
> > My understanding is as follows
> >
> > STEP 1 (This would create a pair RDD)
> > ===
> >
> > reacRdd.map(line => line.split(',')).map(fields => {
> >  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
> >
> >
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
> >  }
> >  else {
> >""
> >  }
> >  })
> >
> > STEP 2
> > ===
> > Since previous step created a pair RDD, I thought flatMapValues method
> > will be applicable.
> > But the code does not even compile saying that flatMapValues is not
> > applicable to RDD :-(
> >
> >
> > reacRdd.map(line => line.split(',')).map(fields => {
> >  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
> >
> >
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
> >  }
> >  else {
> >""
> >  }
> >  }).flatMapValues(skus =>
> > skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
> >
> >
> > SUMMARY
> > ===
> > when a dataset looks like the following
> >
> > 1,red,blue,green
> > 2,yellow,violet,pink
> >
> > I want to output the following and I am asking how do I do that ?
> > Perhaps my code is 100% wrong. Please correct me and educate me :-)
> >
> > 1,red
> > 1,blue
> > 1,green
> > 2,yellow
> > 2,violet
> > 2,pink
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>


Re: FlatMapValues

2014-12-31 Thread Sanjay Subramanian
hey guys
Some of u may care :-) but this is just give u a background with where I am 
going with this. I have an IOS medical side effects app called MedicalSideFx. I 
built the entire underlying data layer aggregation using hadoop and currently 
the search is based on lucene. I am re-architecting the data layer by replacing 
hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines 
sidefx data.     
  @Kapil , sorry but flatMapValues is being reported as undefined
To give u a complete picture of the code (its inside IntelliJ but thats only 
for testingthe real code runs on sparkshell on my cluster)
https://github.com/sanjaysubramanian/msfx_scala/blob/master/src/main/scala/org/medicalsidefx/common/utils/AersReacColumnExtractor.scala

If u were to assume dataset as 
025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10
025005,Arthritis,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10

This present version of the code, the flatMap works but only gives me values 
DeliriumHypokinesiaHypotonia
ArthritisInjection site oedemaInjection site reaction


What I need is
025003,Delirium
025003,Hypokinesia025003,Hypotonia025005,Arthritis
025005,Injection site oedema025005,Injection site reaction

thanks
sanjay
  From: Kapil Malik 
 To: Sean Owen ; Sanjay Subramanian 
 
Cc: "user@spark.apache.org"  
 Sent: Wednesday, December 31, 2014 9:35 AM
 Subject: RE: FlatMapValues
   
Hi Sanjay,

Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to 
import org.apache.spark.rdd.SparkContext._ to use them 
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
 )

@Sean, yes indeed flatMap / flatMapValues both can be used.

Regards,

Kapil 



-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 31 December 2014 21:16
To: Sanjay Subramanian
Cc: user@spark.apache.org
Subject: Re: FlatMapValues

>From the clarification below, the problem is that you are calling 
>flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the other, so 
your RDD is a bunch of Any, which is not at all what you want. You need to 
return a tuple in both cases, which is what Kapil pointed out.

However it's still not quite what you want. Your input is basically [key value1 
value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian 
 wrote:
> My understanding is as follows
>
> STEP 1 (This would create a pair RDD)
> ===
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  })
>
> STEP 2
> ===
> Since previous step created a pair RDD, I thought flatMapValues method 
> will be applicable.
> But the code does not even compile saying that flatMapValues is not 
> applicable to RDD :-(
>
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  }).flatMapValues(skus =>
> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>
>
> SUMMARY
> ===
> when a dataset looks like the following
>
> 1,red,blue,green
> 2,yellow,violet,pink
>
> I want to output the following and I am asking how do I do that ? 
> Perhaps my code is 100% wrong. Please correct me and educate me :-)
>
> 1,red
> 1,blue
> 1,green
> 2,yellow
> 2,violet
> 2,pink

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

RE: FlatMapValues

2014-12-31 Thread Kapil Malik
Hi Sanjay,

Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to 
import org.apache.spark.rdd.SparkContext._ to use them 
(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
 )

@Sean, yes indeed flatMap / flatMapValues both can be used.

Regards,

Kapil 

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 31 December 2014 21:16
To: Sanjay Subramanian
Cc: user@spark.apache.org
Subject: Re: FlatMapValues

From the clarification below, the problem is that you are calling 
flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the other, so 
your RDD is a bunch of Any, which is not at all what you want. You need to 
return a tuple in both cases, which is what Kapil pointed out.

However it's still not quite what you want. Your input is basically [key value1 
value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian 
 wrote:
> My understanding is as follows
>
> STEP 1 (This would create a pair RDD)
> ===
>
> reacRdd.map(line => line.split(',')).map(fields => {
>   if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>   }
>   else {
> ""
>   }
>   })
>
> STEP 2
> ===
> Since previous step created a pair RDD, I thought flatMapValues method 
> will be applicable.
> But the code does not even compile saying that flatMapValues is not 
> applicable to RDD :-(
>
>
> reacRdd.map(line => line.split(',')).map(fields => {
>   if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>   }
>   else {
> ""
>   }
>   }).flatMapValues(skus =>
> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>
>
> SUMMARY
> ===
> when a dataset looks like the following
>
> 1,red,blue,green
> 2,yellow,violet,pink
>
> I want to output the following and I am asking how do I do that ? 
> Perhaps my code is 100% wrong. Please correct me and educate me :-)
>
> 1,red
> 1,blue
> 1,green
> 2,yellow
> 2,violet
> 2,pink

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FlatMapValues

2014-12-31 Thread Sean Owen
>From the clarification below, the problem is that you are calling
flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the
other, so your RDD is a bunch of Any, which is not at all what you
want. You need to return a tuple in both cases, which is what Kapil
pointed out.

However it's still not quite what you want. Your input is basically
[key value1 value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian
 wrote:
> My understanding is as follows
>
> STEP 1 (This would create a pair RDD)
> ===
>
> reacRdd.map(line => line.split(',')).map(fields => {
>   if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>   }
>   else {
> ""
>   }
>   })
>
> STEP 2
> ===
> Since previous step created a pair RDD, I thought flatMapValues method will
> be applicable.
> But the code does not even compile saying that flatMapValues is not
> applicable to RDD :-(
>
>
> reacRdd.map(line => line.split(',')).map(fields => {
>   if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>   }
>   else {
> ""
>   }
>   }).flatMapValues(skus =>
> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>
>
> SUMMARY
> ===
> when a dataset looks like the following
>
> 1,red,blue,green
> 2,yellow,violet,pink
>
> I want to output the following and I am asking how do I do that ? Perhaps my
> code is 100% wrong. Please correct me and educate me :-)
>
> 1,red
> 1,blue
> 1,green
> 2,yellow
> 2,violet
> 2,pink

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FlatMapValues

2014-12-31 Thread Sanjay Subramanian
Hey Kapil, Fernando
Thanks for your mail.
[1] Fernando, if I don't use an "if" logic inside the "map" then if I have 
lines of input data that have less fields than I am expecting I get 
ArrayOutOfBounds exception. so the "if" is to safeguard against that. 
[2] Kapil, I am sorry I did not clarify. Yes my code "DID NOT" compile saying 
that flatMapValues is not defined.
In fact when I used your snippet , the code still does not compile 
Error:(36, 57) value flatMapValues is not a member of 
org.apache.spark.rdd.RDD[(String, String)]                }).filter(pair => 
pair._1.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)            
                                            ^ 

My pom.xml looks like this 

   org.apache.spark
   spark-core_2.10
   1.2.0


   org.apache.spark
   spark-sql_2.10
   1.2.0


[3] To summarize all I want is to convert 

SUMMARY===when a dataset looks like the following 
1,red,blue,green2,yellow,violet,pink
I want to output the following and currently not able to
1,red1,blue1,green2,yellow2,violet2,pink

thanks

regards
sanjay

  From: Fernando O. 
 To: Kapil Malik  
Cc: Sanjay Subramanian ; "user@spark.apache.org" 
 
 Sent: Wednesday, December 31, 2014 6:06 AM
 Subject: Re: FlatMapValues
   
Hi Sanjay,
Doing an if inside a Map sounds like a bad idea, it seems like you actually 
want to filter and then apply map



On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik  wrote:

Hi Sanjay, I tried running your code on spark shell piece by piece – // 
Setupval line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”val line2 = 
“025127,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”val lines = Array[String](line1, line2) 
val r1 = sc.parallelize(lines, 2)// r1 is the original RDD[String] to begin 
with val r2 = r1.map(line => line.split(','))// RDD[Array[String]] – so far, so 
goodval r3 = r2.map(fields => {  if (fields.length >= 11 && 
!fields(0).contains("VAERS_ID")) {    
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))//
 Returns a pair (String, String), good  }  else {    ""// Returns a String, bad 
 }  })// RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues 
since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I 
am surprised how your code compiled correctly.  The following changes in your 
snippet make it work as intended - reacRdd.map(line => 
line.split(',')).map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
    
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
  }
  else {
    ("","")
  }
  }).filter(pair => pair._1.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) Please 
note that this too saves lines like (025126,Chills),i.e. with opening and 
closing brackets ( and ). If you want to get rid of them, better do another map 
operation to map pair to String. Kapil From: Sanjay Subramanian 
[mailto:sanjaysubraman...@yahoo.com.INVALID]
Sent: 31 December 2014 13:42
Cc: user@spark.apache.org
Subject: FlatMapValues hey guys  My dataset is like this  
025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is 
==025126,Chills025126,Injection site oedema025126,Injection 
site reaction025126,Malaise025126,Myalgia My code is as follows but the 
flatMapValues does not work even after I have created the pair 
RDD.****reacRdd.map(line
 => line.split(',')).map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
    
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
  }
  else {
    ""
  }
  }).filter(line => line.toString.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + 
outFile)
 thanks sanjay



  

Re: FlatMapValues

2014-12-31 Thread Fernando O.
Hi Sanjay,

Doing an if inside a Map sounds like a bad idea, it seems like you actually
want to filter and then apply map

On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik  wrote:

>  Hi Sanjay,
>
>
>
> I tried running your code on spark shell piece by piece –
>
>
>
> // Setup
>
> val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10”
>
> val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10”
>
> val lines = Array[String](line1, line2)
>
>
>
> val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to
> begin with
>
>
>
> val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far,
> so good
>
> val r3 = r2.map(fields => {
>
>   if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
> // Returns a pair (String, String), good
>
>   }
>
>   else {
>
> "" // Returns a String, bad
>
>   }
>
>   }) // RDD[Serializable] – PROBLEM
>
>
>
> I was not even able to apply flatMapValues since the filtered RDD passed
> to it is RDD[Serializable] and not a pair RDD. I am surprised how your code
> compiled correctly.
>
>
>
>
>
> The following changes in your snippet make it work as intended -
>
>
>
> reacRdd.map(line => line.split(*','*)).map(fields => {
>   *if *(fields.length >= 11 && !fields(0).contains(*"VAERS_ID"*)) {
> 
> (fields(0),(fields(1)+*"**\t**"*+fields(3)+*"**\t**"*+fields(5)+*"**\t**"*+fields(7)+*"**\t**"*+fields(9)))
>   }
>   *else *{
> (*"","")*
>   }
>   }).filter(pair => pair._1.length() > 0).flatMapValues(skus => 
> skus.split(*'**\t**'*)).saveAsTextFile(*"/data/vaers/msfx/reac/" *+ outFile)
>
>
>
> Please note that this too saves lines like (025126,Chills), i.e. with
> opening and closing brackets ( and ). If you want to get rid of them,
> better do another map operation to map pair to String.
>
>
>
> Kapil
>
>
>
> *From:* Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID]
> *Sent:* 31 December 2014 13:42
> *Cc:* user@spark.apache.org
> *Subject:* FlatMapValues
>
>
>
> hey guys
>
>
>
> My dataset is like this
>
>
>
> 025126,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10
>
>
>
> Intended output is
>
> ==
>
> 025126,Chills
>
> 025126,Injection site oedema
>
> 025126,Injection site reaction
>
> 025126,Malaise
>
> 025126,Myalgia
>
>
>
> My code is as follows but the flatMapValues does not work even after I have 
> created the pair RDD.
>
> 
>
> reacRdd.map(line => line.split(*','*)).map(fields => {
>   *if *(fields.length >= 11 && !fields(0).contains(*"VAERS_ID"*)) {
> 
> (fields(0),(fields(1)+*"**\t**"*+fields(3)+*"**\t**"*+fields(5)+*"**\t**"*+fields(7)+*"**\t**"*+fields(9)))
>   }
>   *else *{
>
> *""  *}
>   }).filter(line => line.toString.length() > 0).flatMapValues(skus => 
> skus.split(*'**\t**'*)).saveAsTextFile(*"/data/vaers/msfx/reac/" *+ outFile)
>
> 
>
>
>
> thanks
>
>
>
> sanjay
>


RE: FlatMapValues

2014-12-31 Thread Kapil Malik
Hi Sanjay,

I tried running your code on spark shell piece by piece –

// Setup
val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”
val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10”
val lines = Array[String](line1, line2)

val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin 
with

val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far, so good
val r3 = r2.map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {

(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
 // Returns a pair (String, String), good
  }
  else {
"" // Returns a String, bad
  }
  }) // RDD[Serializable] – PROBLEM

I was not even able to apply flatMapValues since the filtered RDD passed to it 
is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled 
correctly.


The following changes in your snippet make it work as intended -


reacRdd.map(line => line.split(',')).map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {

(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
  }
  else {
("","")
  }
  }).filter(pair => pair._1.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)

Please note that this too saves lines like (025126,Chills), i.e. with opening 
and closing brackets ( and ). If you want to get rid of them, better do another 
map operation to map pair to String.

Kapil

From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID]
Sent: 31 December 2014 13:42
Cc: user@spark.apache.org
Subject: FlatMapValues

hey guys

My dataset is like this

025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10

Intended output is
==
025126,Chills
025126,Injection site oedema
025126,Injection site reaction
025126,Malaise
025126,Myalgia


My code is as follows but the flatMapValues does not work even after I have 
created the pair RDD.



reacRdd.map(line => line.split(',')).map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {

(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
  }
  else {
""
  }
  }).filter(line => line.toString.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)



thanks

sanjay


Re: FlatMapValues

2014-12-31 Thread Raghavendra Pandey
Why don't you push "\n" instead of "\t" in your first transformation [
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"
+fields(9)))] and then do saveAsTextFile?

-Raghavendra

On Wed Dec 31 2014 at 1:42:55 PM Sanjay Subramanian
 wrote:

> hey guys
>
> My dataset is like this
>
> 025126,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10
>
> Intended output is
> ==
> 025126,Chills
> 025126,Injection site oedema
> 025126,Injection site reaction
> 025126,Malaise
> 025126,Myalgia
>
> My code is as follows but the flatMapValues does not work even after I have 
> created the pair RDD.
>
> 
>
> reacRdd.map(line => line.split(',')).map(fields => {
>   if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>     
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>   }
>   else {
> ""
>   }
>   }).filter(line => line.toString.length() > 0).flatMapValues(skus => 
> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>
> 
>
>
> thanks
>
> sanjay
>


FlatMapValues

2014-12-31 Thread Sanjay Subramanian
hey guys 
My dataset is like this 
025126,Chills,8.10,Injection site oedema,8.10,Injection site 
reaction,8.10,Malaise,8.10,Myalgia,8.10

Intended output is ==025126,Chills
025126,Injection site oedema
025126,Injection site reaction
025126,Malaise
025126,Myalgia

My code is as follows but the flatMapValues does not work even after I have 
created the pair 
RDD.reacRdd.map(line
 => line.split(',')).map(fields => {
  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {

(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
  }
  else {
""
  }
  }).filter(line => line.toString.length() > 0).flatMapValues(skus => 
skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + 
outFile)
thanks
sanjay

Re: Type problem in Java when using flatMapValues

2014-10-03 Thread Robin Keunen
Damn, you're right, I wasn't looking at it properly. I was confused by 
intelliJ I guess.


Many thanks!


On 2014-10-02 19:02, Sean Owen wrote:

Eh, is it not that you are mapping the values of an RDD whose keys are
StringStrings, but expecting the keys are Strings? That's also about
what the compiler is saying too.

On Thu, Oct 2, 2014 at 4:15 PM, Robin Keunen  wrote:

Hi all,

I successfully implemented my algorithm in Scala but my team wants it in
Java. I have a problem with Generics, can anyone help me?

I have a first JavaPairRDD with a structure like ((ean, key), [from, to,
value])

ean and key are string
from and to are DateTime
value is a Double

JavaPairRDD> eanKeyTsParameters =
javaRDD.mapToPair( ... );

Then I try to do flatMapValues to apply the GenerateTimeSeries Function, it
takes the from, to and values to generate a List. Here is the
error I get when compiling:

error: incompatible types: no instance(s) of type variable(s) U exist so
that JavaPairRDD conforms to JavaPairRDD

Here is what IntelliJ tells me:
flatMapValues( Function, Iterable> ) in JavaPairRDD
cannot be applied to Transformations.GenerateTimeSeries

Here is the problematic transformation:

JavaPairRDD keyLongDouble =
 eanKeyTsParameters.flatMapValues(new
Transformations.GenerateTimeSeries());

And here is the Function:

import org.apache.spark.api.java.function.Function; [...]

public class Transformations {
 public static class GenerateTimeSeries
 implements Function, Iterable> {

 @Override
 public Iterable call(List args) {
 DateTime start = (DateTime) args.get(0);
 DateTime end = (DateTime) args.get(1);
 Double value  = (Double) args.get(2);
 int granularity = 24*60*60*1000; // 1 day

 return AggregationUtils.createTimeSeries(start, end, value,
granularity);
 }
 }
}

Any idea?

Thanks

--

Robin Keunen
Software Engineer
robin.keu...@lampiris.be
www.lampiris.be


--

Robin Keunen
Software Engineer
robin.keu...@lampiris.be
www.lampiris.be


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Type problem in Java when using flatMapValues

2014-10-02 Thread Sean Owen
Eh, is it not that you are mapping the values of an RDD whose keys are
StringStrings, but expecting the keys are Strings? That's also about
what the compiler is saying too.

On Thu, Oct 2, 2014 at 4:15 PM, Robin Keunen  wrote:
> Hi all,
>
> I successfully implemented my algorithm in Scala but my team wants it in
> Java. I have a problem with Generics, can anyone help me?
>
> I have a first JavaPairRDD with a structure like ((ean, key), [from, to,
> value])
>
> ean and key are string
> from and to are DateTime
> value is a Double
>
> JavaPairRDD> eanKeyTsParameters =
> javaRDD.mapToPair( ... );
>
> Then I try to do flatMapValues to apply the GenerateTimeSeries Function, it
> takes the from, to and values to generate a List. Here is the
> error I get when compiling:
>
> error: incompatible types: no instance(s) of type variable(s) U exist so
> that JavaPairRDD conforms to JavaPairRDD
>
> Here is what IntelliJ tells me:
> flatMapValues( Function, Iterable> ) in JavaPairRDD
> cannot be applied to Transformations.GenerateTimeSeries
>
> Here is the problematic transformation:
>
> JavaPairRDD keyLongDouble =
> eanKeyTsParameters.flatMapValues(new
> Transformations.GenerateTimeSeries());
>
> And here is the Function:
>
> import org.apache.spark.api.java.function.Function; [...]
>
> public class Transformations {
> public static class GenerateTimeSeries
> implements Function, Iterable> {
>
> @Override
> public Iterable call(List args) {
> DateTime start = (DateTime) args.get(0);
> DateTime end = (DateTime) args.get(1);
> Double value  = (Double) args.get(2);
> int granularity = 24*60*60*1000; // 1 day
>
> return AggregationUtils.createTimeSeries(start, end, value,
> granularity);
> }
> }
> }
>
> Any idea?
>
> Thanks
>
> --
>
> Robin Keunen
> Software Engineer
> robin.keu...@lampiris.be
> www.lampiris.be

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Type problem in Java when using flatMapValues

2014-10-02 Thread Robin Keunen

Hi all,

I successfully implemented my algorithm in Scala but my team wants it in 
Java. I have a problem with Generics, can anyone help me?


I have a first JavaPairRDD with a structure like ((ean, key), [from, to, 
value])


 * ean and key are string
 * from and to are DateTime
 * value is a Double

JavaPairRDD> eanKeyTsParameters = 
javaRDD.mapToPair( ... );


Then I try to do flatMapValues to apply the GenerateTimeSeries Function, 
it takes the /from, to /and /values/ to generate a List. 
Here is the error I get when compiling:


error: incompatible types: no instance(s) of type variable(s) U exist so 
that JavaPairRDD conforms to JavaPairRDD


Here is what IntelliJ tells me:
flatMapValues( Function, Iterable> ) in 
JavaPairRDD cannot be applied to Transformations.GenerateTimeSeries


Here is the problematic transformation:

JavaPairRDD keyLongDouble =
eanKeyTsParameters.flatMapValues(new 
Transformations.GenerateTimeSeries());


And here is the Function:

import org.apache.spark.api.java.function.Function; [...]

public class Transformations {
public static class GenerateTimeSeries
implements Function, Iterable> {

@Override
public Iterable call(List args) {
DateTime start = (DateTime) args.get(0);
DateTime end = (DateTime) args.get(1);
Double value  = (Double) args.get(2);
int granularity = 24*60*60*1000; // 1 day

return AggregationUtils.createTimeSeries(start, end, value, 
granularity);

}
}
}

Any idea?

Thanks

--

Robin Keunen
Software Engineer
robin.keu...@lampiris.be
www.lampiris.be