It seems Spark is not able to serialize your function code to worker nodes.
I have tried to put a solution in simple set of commands. Maybe you can combine last four line into function. val arr = Array((1,"A","<20","0"), (1,"A",">20 & <40","1"), (1,"B",">20 & <40","0"), (1,"C",">20 & <40","0"), (1,"C",">20 & <40","0"), (2,"A","<20","0"), (3,"B",">20 & <40","1"), (3,"B",">40","2")) val rdd = sc.parallelize(arr) val prdd = rdd.map(a => (a._1,a)) val totals = prdd.groupByKey.map(a => (a._1, a._2.size)) var n1 = rdd.map(a => ((a._1, a._2), 1) ) var n2 = n1.reduceByKey(_+_).map(a => (a._1._1, (a._1._2, a._2))) var n3 = n2.join(totals).map(a => (a._1, (a._2._1._1, a._2._1._2.toDouble / a._2._2))) var n4 = n3.map(a => (a._1, a._2._1 + ":" + a._2._2.toString)).reduceByKey((a, b) => a + "|" + b) n4.collect.foreach(println) On Mon, Jan 18, 2016 at 6:47 AM, Neha Mehta <nehamehta...@gmail.com> wrote: > Hi, > > I have a scenario wherein my dataset has around 30 columns. It is > basically user activity information. I need to group the information by > each user and then for each column/activity parameter I need to find the > percentage affinity for each value in that column for that user. Below is > the sample input and output. > > UserId C1 C2 C3 > 1 A <20 0 > 1 A >20 & <40 1 > 1 B >20 & <40 0 > 1 C >20 & <40 0 > 1 C >20 & <40 0 > 2 A <20 0 > 3 B >20 & <40 1 > 3 B >40 2 > > > > > > > > > Output > > > 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2 > 2 A:1 <20:1 0:01 > 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5 > > Presently this is how I am calculating these values: > Group by UserId and C1 and compute values for C1 for all the users, then > do a group by by Userid and C2 and find the fractions for C2 for each user > and so on. This approach is quite slow. Also the number of records for > each user will be at max 30. So I would like to take a second approach > wherein I do a groupByKey and pass the entire list of records for each key > to a function which computes all the percentages for each column for each > user at once. Below are the steps I am trying to follow: > > 1. Dataframe1 => group by UserId , find the counts of records for each > user. Join the results back to the input so that counts are available with > each record > 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2)) > > def myUserAggregator(rows: Iterable[Row]): > scala.collection.mutable.Map[Int,String] = { > val returnValue = scala.collection.mutable.Map[Int,String]() > if (rows != null) { > val activityMap = scala.collection.mutable.Map[Int, > scala.collection.mutable.Map[String, > Int]]().withDefaultValue(scala.collection.mutable.Map[String, > Int]().withDefaultValue(0)) > > val rowIt = rows.iterator > var sentCount = 1 > for (row <- rowIt) { > sentCount = row(29).toString().toInt > for (i <- 0 until row.length) { > var m = activityMap(i) > if (activityMap(i) == null) { > m = collection.mutable.Map[String, > Int]().withDefaultValue(0) > } > m(row(i).toString()) += 1 > activityMap.update(i, m) > } > } > var activityPPRow: Row = Row() > for((k,v) <- activityMap) { > var rowVal:String = "" > for((a,b) <- v) { > rowVal += rowVal + a + ":" + b/sentCount + "|" > } > returnValue.update(k, rowVal) > // activityPPRow.apply(k) = rowVal > } > > } > return returnValue > } > > When I run step 2 I get the following error. I am new to Scala and Spark > and am unable to figure out how to pass the Iterable[Row] to a function and > get back the results. > > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) > at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318) > at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) > at org.apache.spark.rdd.RDD.map(RDD.scala:317) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:102) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:104) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:106) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:108) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:110) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:112) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:114) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:116) > ...... > > > Thanks for the help. > > Regards, > Neha Mehta >