Re: map function

2014-12-04 Thread Yifan LI
Thanks, Paolo and Mark. :)


 On 04 Dec 2014, at 11:58, Paolo Platter paolo.plat...@agilelab.it wrote:
 
 Hi,
 
 rdd.flatMap( e = e._2.map( i = ( i, e._1)))
 
 Should work, but I didn't test it so maybe I'm missing something.
 
 Paolo
 
 Inviata dal mio Windows Phone
 Da: Yifan LI mailto:iamyifa...@gmail.com
 Inviato: ‎04/‎12/‎2014 09:27
 A: user@spark.apache.org mailto:user@spark.apache.org
 Oggetto: map function 
 
 Hi,
 
 I have a RDD like below:
 (1, (10, 20))
 (2, (30, 40, 10))
 (3, (30))
 …
 
 Is there any way to map it to this:
 (10,1)
 (20,1)
 (30,2)
 (40,2)
 (10,2)
 (30,3)
 …
 
 generally, for each element, it might be mapped to multiple.
 
 Thanks in advance! 
 
 
 Best,
 Yifan LI
 
 
 
 
 



Re: Map Function does not seem to be executing over RDD

2014-07-09 Thread Yana Kadiyska
Does this line  println(Retuning +string) from the hash function
print what you expect? If you're not seeing that output in the
executor log I'd also put some debug statements in case other, since
your match in the interesting case is conditioned on if(
fieldsList.contains(index)) -- maybe that doesn't catch what you think
it should...if that's the case you can dump out the contents of
fieldsList within the other case (i.e. inside the map) and see
what's there...

On Wed, Jul 9, 2014 at 9:46 PM, Raza Rehman razaurreh...@gmail.com wrote:
 Hello every one

 I am having some problem with a simple Scala/ Spark Code in which I am
 trying to replaces certain fields in a csv with their hashes

 class DSV (var line:String=,fieldsList:Seq[Int], var delimiter:String=,)
 extends Serializable {

 def hash(s:String):String={
 var md = MessageDigest.getInstance(sha)
 md.update(s.getBytes(UTF-8))

 var digest = md.digest()

 val string:Option[String] = Option(digest).map(Hex.valueOf)

 println(Retuning +string)
 string.getOrElse()
 }

 def anonymizeFields(l:String):String ={
 l.split(delimiter,-1).zipWithIndex
 .map {
 case (str, index) if( fieldsList.contains(index))
 =hash(str)
 case other = other._1
 }.mkString(delimiter)
 }
 }

 I am calling the anonymize function like this but the anondata seems to be
 the same as the original dsvData

 var dsvData = sc.textFile(inputPath+inputFileName).map(
 line=(new DSV(line,List(1,2),  \\|))
 )

 println(Lines Processed=+dsvData.count())
 var anonData = dsvData.map(l=l.anonymizeFields(l.line))

 println(DSVs Processed=+anonData.count())
 anonData.saveAsTextFile(outputPath+outputFileName)

 I have tried the execution through shell as well but the problem persists.
 The job does finish but the worker log shows the following error message

 14/07/09 11:30:20 ERROR EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@host:60593] -
 [akka.tcp://sparkExecutor@host:51397]: Error [Association failed with
 [akka.tcp://sparkExecutor@host:51397]] [

 Regards
 MRK