Does this line  println("Retuning "+string) from the hash function
print what you expect? If you're not seeing that output in the
executor log I'd also put some debug statements in "case other", since
your match in the "interesting" case is conditioned on if(
fieldsList.contains(index)) -- maybe that doesn't catch what you think
it should...if that's the case you can dump out the contents of
fieldsList within the "other" case (i.e. inside the map) and see
what's there...

On Wed, Jul 9, 2014 at 9:46 PM, Raza Rehman <razaurreh...@gmail.com> wrote:
> Hello every one
>
> I am having some problem with a simple Scala/ Spark Code in which I am
> trying to replaces certain fields in a csv with their hashes
>
> class DSV (var line:String="",fieldsList:Seq[Int], var delimiter:String=",")
> extends Serializable {
>
>         def hash(s:String):String={
>                 var md = MessageDigest.getInstance("sha")
>                 md.update(s.getBytes("UTF-8"))
>
>                 var digest = md.digest()
>
>                 val string:Option[String] = Option(digest).map(Hex.valueOf)
>
>                 println("Retuning "+string)
>                 string.getOrElse("")
>         }
>
>         def anonymizeFields(l:String):String ={
>                 l.split(delimiter,-1).zipWithIndex
>                 .map {
>                         case (str, index) if( fieldsList.contains(index))
> =>hash(str)
>                         case other => other._1
>                 }.mkString(delimiter)
>         }
> }
>
> I am calling the anonymize function like this but the anondata seems to be
> the same as the original dsvData
>
> var dsvData = sc.textFile(inputPath+inputFileName).map(
>                         line=>(new DSV(line,List(1,2),  "\\|"))
>                 )
>
>                 println("Lines Processed="+dsvData.count())
>                 var anonData = dsvData.map(l=>l.anonymizeFields(l.line))
>
>                 println("DSVs Processed="+anonData.count())
>                 anonData.saveAsTextFile(outputPath+outputFileName)
>
> I have tried the execution through shell as well but the problem persists.
> The job does finish but the worker log shows the following error message
>
> 14/07/09 11:30:20 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkWorker@host:60593] ->
> [akka.tcp://sparkExecutor@host:51397]: Error [Association failed with
> [akka.tcp://sparkExecutor@host:51397]] [
>
> Regards
> MRK

Reply via email to