Hello every one

I am having some problem with a simple Scala/ Spark Code in which I am
trying to replaces certain fields in a csv with their hashes

class DSV (var line:String="",fieldsList:Seq[Int], var
delimiter:String=",") extends Serializable {

        def hash(s:String):String={
                var md = MessageDigest.getInstance("sha")
                md.update(s.getBytes("UTF-8"))

                var digest = md.digest()

                val string:Option[String] = Option(digest).map(Hex.valueOf)

                println("Retuning "+string)
                string.getOrElse("")
        }

        def anonymizeFields(l:String):String ={
                l.split(delimiter,-1).zipWithIndex
                .map {
                        case (str, index) if( fieldsList.contains(index))
=>hash(str)
                        case other => other._1
                }.mkString(delimiter)
        }
}

I am calling the anonymize function like this but the anondata seems to be
the same as the original dsvData

var dsvData = sc.textFile(inputPath+inputFileName).map(
                        line=>(new DSV(line,List(1,2),  "\\|"))
                )

                println("Lines Processed="+dsvData.count())
                var anonData = dsvData.map(l=>l.anonymizeFields(l.line))

                println("DSVs Processed="+anonData.count())
                anonData.saveAsTextFile(outputPath+outputFileName)

I have tried the execution through shell as well but the problem persists.
The job does finish but the worker log shows the following error message

14/07/09 11:30:20 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@host:60593] -> [akka.tcp://sparkExecutor@host:51397]:
Error [Association failed with [akka.tcp://sparkExecutor@host:51397]] [

Regards
MRK

Reply via email to