Re: map function
Thanks, Paolo and Mark. :) On 04 Dec 2014, at 11:58, Paolo Platter paolo.plat...@agilelab.it wrote: Hi, rdd.flatMap( e = e._2.map( i = ( i, e._1))) Should work, but I didn't test it so maybe I'm missing something. Paolo Inviata dal mio Windows Phone Da: Yifan LI mailto:iamyifa...@gmail.com Inviato: 04/12/2014 09:27 A: user@spark.apache.org mailto:user@spark.apache.org Oggetto: map function Hi, I have a RDD like below: (1, (10, 20)) (2, (30, 40, 10)) (3, (30)) … Is there any way to map it to this: (10,1) (20,1) (30,2) (40,2) (10,2) (30,3) … generally, for each element, it might be mapped to multiple. Thanks in advance! Best, Yifan LI
Re: Map Function does not seem to be executing over RDD
Does this line println(Retuning +string) from the hash function print what you expect? If you're not seeing that output in the executor log I'd also put some debug statements in case other, since your match in the interesting case is conditioned on if( fieldsList.contains(index)) -- maybe that doesn't catch what you think it should...if that's the case you can dump out the contents of fieldsList within the other case (i.e. inside the map) and see what's there... On Wed, Jul 9, 2014 at 9:46 PM, Raza Rehman razaurreh...@gmail.com wrote: Hello every one I am having some problem with a simple Scala/ Spark Code in which I am trying to replaces certain fields in a csv with their hashes class DSV (var line:String=,fieldsList:Seq[Int], var delimiter:String=,) extends Serializable { def hash(s:String):String={ var md = MessageDigest.getInstance(sha) md.update(s.getBytes(UTF-8)) var digest = md.digest() val string:Option[String] = Option(digest).map(Hex.valueOf) println(Retuning +string) string.getOrElse() } def anonymizeFields(l:String):String ={ l.split(delimiter,-1).zipWithIndex .map { case (str, index) if( fieldsList.contains(index)) =hash(str) case other = other._1 }.mkString(delimiter) } } I am calling the anonymize function like this but the anondata seems to be the same as the original dsvData var dsvData = sc.textFile(inputPath+inputFileName).map( line=(new DSV(line,List(1,2), \\|)) ) println(Lines Processed=+dsvData.count()) var anonData = dsvData.map(l=l.anonymizeFields(l.line)) println(DSVs Processed=+anonData.count()) anonData.saveAsTextFile(outputPath+outputFileName) I have tried the execution through shell as well but the problem persists. The job does finish but the worker log shows the following error message 14/07/09 11:30:20 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@host:60593] - [akka.tcp://sparkExecutor@host:51397]: Error [Association failed with [akka.tcp://sparkExecutor@host:51397]] [ Regards MRK