It seems Spark is not able to serialize your function code to worker nodes.

I have tried to put a solution in simple set of commands. Maybe you can
combine last four line into function.


val arr = Array((1,"A","<20","0"), (1,"A",">20 & <40","1"), (1,"B",">20 &
<40","0"), (1,"C",">20 & <40","0"), (1,"C",">20 & <40","0"),
(2,"A","<20","0"), (3,"B",">20 & <40","1"), (3,"B",">40","2"))

val rdd = sc.parallelize(arr)

val prdd = rdd.map(a => (a._1,a))
val totals = prdd.groupByKey.map(a => (a._1, a._2.size))

var n1 = rdd.map(a => ((a._1, a._2), 1) )
var n2 = n1.reduceByKey(_+_).map(a => (a._1._1, (a._1._2, a._2)))
var n3 = n2.join(totals).map(a => (a._1, (a._2._1._1, a._2._1._2.toDouble /
a._2._2)))
var n4 = n3.map(a => (a._1, a._2._1 + ":" +
a._2._2.toString)).reduceByKey((a, b) => a + "|" + b)

n4.collect.foreach(println)




On Mon, Jan 18, 2016 at 6:47 AM, Neha Mehta <nehamehta...@gmail.com> wrote:

> Hi,
>
> I have a scenario wherein my dataset has around 30 columns. It is
> basically user activity information. I need to group the information by
> each user and then for each column/activity parameter I need to find the
> percentage affinity for each value in that column for that user. Below is
> the sample input and output.
>
> UserId C1 C2 C3
> 1 A <20 0
> 1 A >20 & <40 1
> 1 B >20 & <40 0
> 1 C >20 & <40 0
> 1 C >20 & <40 0
> 2 A <20 0
> 3 B >20 & <40 1
> 3 B >40 2
>
>
>
>
>
>
>
>
> Output
>
>
> 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2
> 2 A:1 <20:1 0:01
> 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5
>
> Presently this is how I am calculating these values:
> Group by UserId and C1 and compute values for C1 for all the users, then
> do a group by by Userid and C2 and find the fractions for C2 for each user
> and so on. This approach is quite slow.  Also the number of records for
> each user will be at max 30. So I would like to take a second approach
> wherein I do a groupByKey and pass the entire list of records for each key
> to a function which computes all the percentages for each column for each
> user at once. Below are the steps I am trying to follow:
>
> 1. Dataframe1 => group by UserId , find the counts of records for each
> user. Join the results back to the input so that counts are available with
> each record
> 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2))
>
> def myUserAggregator(rows: Iterable[Row]):
> scala.collection.mutable.Map[Int,String] = {
>     val returnValue = scala.collection.mutable.Map[Int,String]()
>     if (rows != null) {
>       val activityMap = scala.collection.mutable.Map[Int,
> scala.collection.mutable.Map[String,
> Int]]().withDefaultValue(scala.collection.mutable.Map[String,
> Int]().withDefaultValue(0))
>
>       val rowIt = rows.iterator
>       var sentCount = 1
>       for (row <- rowIt) {
>         sentCount = row(29).toString().toInt
>         for (i <- 0 until row.length) {
>           var m = activityMap(i)
>           if (activityMap(i) == null) {
>             m = collection.mutable.Map[String,
> Int]().withDefaultValue(0)
>           }
>           m(row(i).toString()) += 1
>           activityMap.update(i, m)
>         }
>       }
>       var activityPPRow: Row = Row()
>       for((k,v) <- activityMap) {
>           var rowVal:String = ""
>           for((a,b) <- v) {
>             rowVal += rowVal + a + ":" + b/sentCount + "|"
>           }
>           returnValue.update(k, rowVal)
>     //      activityPPRow.apply(k) = rowVal
>       }
>
>     }
>     return returnValue
>   }
>
> When I run step 2 I get the following error. I am new to Scala and Spark
> and am unable to figure out how to pass the Iterable[Row] to a function and
> get back the results.
>
> org.apache.spark.SparkException: Task not serializable
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>     at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>     at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
>     at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
>     at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
>     at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>     at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>     at org.apache.spark.rdd.RDD.map(RDD.scala:317)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:102)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:104)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:106)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:108)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:110)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:112)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:114)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:116)
> ......
>
>
> Thanks for the help.
>
> Regards,
> Neha Mehta
>

Reply via email to