Hi, I have a scenario wherein my dataset has around 30 columns. It is basically user activity information. I need to group the information by each user and then for each column/activity parameter I need to find the percentage affinity for each value in that column for that user. Below is the sample input and output.
UserId C1 C2 C3 1 A <20 0 1 A >20 & <40 1 1 B >20 & <40 0 1 C >20 & <40 0 1 C >20 & <40 0 2 A <20 0 3 B >20 & <40 1 3 B >40 2 Output 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2 2 A:1 <20:1 0:01 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5 Presently this is how I am calculating these values: Group by UserId and C1 and compute values for C1 for all the users, then do a group by by Userid and C2 and find the fractions for C2 for each user and so on. This approach is quite slow. Also the number of records for each user will be at max 30. So I would like to take a second approach wherein I do a groupByKey and pass the entire list of records for each key to a function which computes all the percentages for each column for each user at once. Below are the steps I am trying to follow: 1. Dataframe1 => group by UserId , find the counts of records for each user. Join the results back to the input so that counts are available with each record 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2)) def myUserAggregator(rows: Iterable[Row]): scala.collection.mutable.Map[Int,String] = { val returnValue = scala.collection.mutable.Map[Int,String]() if (rows != null) { val activityMap = scala.collection.mutable.Map[Int, scala.collection.mutable.Map[String, Int]]().withDefaultValue(scala.collection.mutable.Map[String, Int]().withDefaultValue(0)) val rowIt = rows.iterator var sentCount = 1 for (row <- rowIt) { sentCount = row(29).toString().toInt for (i <- 0 until row.length) { var m = activityMap(i) if (activityMap(i) == null) { m = collection.mutable.Map[String, Int]().withDefaultValue(0) } m(row(i).toString()) += 1 activityMap.update(i, m) } } var activityPPRow: Row = Row() for((k,v) <- activityMap) { var rowVal:String = "" for((a,b) <- v) { rowVal += rowVal + a + ":" + b/sentCount + "|" } returnValue.update(k, rowVal) // activityPPRow.apply(k) = rowVal } } return returnValue } When I run step 2 I get the following error. I am new to Scala and Spark and am unable to figure out how to pass the Iterable[Row] to a function and get back the results. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) at org.apache.spark.rdd.RDD.map(RDD.scala:317) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:102) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:104) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:106) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:108) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:112) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:114) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:116) ...... Thanks for the help. Regards, Neha Mehta