Map Function does not seem to be executing over RDD

2014-07-09 Thread Raza Rehman
Hello every one

I am having some problem with a simple Scala/ Spark Code in which I am
trying to replaces certain fields in a csv with their hashes

class DSV (var line:String=,fieldsList:Seq[Int], var
delimiter:String=,) extends Serializable {

def hash(s:String):String={
var md = MessageDigest.getInstance(sha)
md.update(s.getBytes(UTF-8))

var digest = md.digest()

val string:Option[String] = Option(digest).map(Hex.valueOf)

println(Retuning +string)
string.getOrElse()
}

def anonymizeFields(l:String):String ={
l.split(delimiter,-1).zipWithIndex
.map {
case (str, index) if( fieldsList.contains(index))
=hash(str)
case other = other._1
}.mkString(delimiter)
}
}

I am calling the anonymize function like this but the anondata seems to be
the same as the original dsvData

var dsvData = sc.textFile(inputPath+inputFileName).map(
line=(new DSV(line,List(1,2),  \\|))
)

println(Lines Processed=+dsvData.count())
var anonData = dsvData.map(l=l.anonymizeFields(l.line))

println(DSVs Processed=+anonData.count())
anonData.saveAsTextFile(outputPath+outputFileName)

I have tried the execution through shell as well but the problem persists.
The job does finish but the worker log shows the following error message

14/07/09 11:30:20 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@host:60593] - [akka.tcp://sparkExecutor@host:51397]:
Error [Association failed with [akka.tcp://sparkExecutor@host:51397]] [

Regards
MRK


Re: Map Function does not seem to be executing over RDD

2014-07-09 Thread Yana Kadiyska
Does this line  println(Retuning +string) from the hash function
print what you expect? If you're not seeing that output in the
executor log I'd also put some debug statements in case other, since
your match in the interesting case is conditioned on if(
fieldsList.contains(index)) -- maybe that doesn't catch what you think
it should...if that's the case you can dump out the contents of
fieldsList within the other case (i.e. inside the map) and see
what's there...

On Wed, Jul 9, 2014 at 9:46 PM, Raza Rehman razaurreh...@gmail.com wrote:
 Hello every one

 I am having some problem with a simple Scala/ Spark Code in which I am
 trying to replaces certain fields in a csv with their hashes

 class DSV (var line:String=,fieldsList:Seq[Int], var delimiter:String=,)
 extends Serializable {

 def hash(s:String):String={
 var md = MessageDigest.getInstance(sha)
 md.update(s.getBytes(UTF-8))

 var digest = md.digest()

 val string:Option[String] = Option(digest).map(Hex.valueOf)

 println(Retuning +string)
 string.getOrElse()
 }

 def anonymizeFields(l:String):String ={
 l.split(delimiter,-1).zipWithIndex
 .map {
 case (str, index) if( fieldsList.contains(index))
 =hash(str)
 case other = other._1
 }.mkString(delimiter)
 }
 }

 I am calling the anonymize function like this but the anondata seems to be
 the same as the original dsvData

 var dsvData = sc.textFile(inputPath+inputFileName).map(
 line=(new DSV(line,List(1,2),  \\|))
 )

 println(Lines Processed=+dsvData.count())
 var anonData = dsvData.map(l=l.anonymizeFields(l.line))

 println(DSVs Processed=+anonData.count())
 anonData.saveAsTextFile(outputPath+outputFileName)

 I have tried the execution through shell as well but the problem persists.
 The job does finish but the worker log shows the following error message

 14/07/09 11:30:20 ERROR EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@host:60593] -
 [akka.tcp://sparkExecutor@host:51397]: Error [Association failed with
 [akka.tcp://sparkExecutor@host:51397]] [

 Regards
 MRK