Hello every one I am having some problem with a simple Scala/ Spark Code in which I am trying to replaces certain fields in a csv with their hashes
class DSV (var line:String="",fieldsList:Seq[Int], var delimiter:String=",") extends Serializable { def hash(s:String):String={ var md = MessageDigest.getInstance("sha") md.update(s.getBytes("UTF-8")) var digest = md.digest() val string:Option[String] = Option(digest).map(Hex.valueOf) println("Retuning "+string) string.getOrElse("") } def anonymizeFields(l:String):String ={ l.split(delimiter,-1).zipWithIndex .map { case (str, index) if( fieldsList.contains(index)) =>hash(str) case other => other._1 }.mkString(delimiter) } } I am calling the anonymize function like this but the anondata seems to be the same as the original dsvData var dsvData = sc.textFile(inputPath+inputFileName).map( line=>(new DSV(line,List(1,2), "\\|")) ) println("Lines Processed="+dsvData.count()) var anonData = dsvData.map(l=>l.anonymizeFields(l.line)) println("DSVs Processed="+anonData.count()) anonData.saveAsTextFile(outputPath+outputFileName) I have tried the execution through shell as well but the problem persists. The job does finish but the worker log shows the following error message 14/07/09 11:30:20 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@host:60593] -> [akka.tcp://sparkExecutor@host:51397]: Error [Association failed with [akka.tcp://sparkExecutor@host:51397]] [ Regards MRK