Hi,

I have a scenario wherein my dataset has around 30 columns. It is basically
user activity information. I need to group the information by each user and
then for each column/activity parameter I need to find the percentage
affinity for each value in that column for that user. Below is the sample
input and output.

UserId C1 C2 C3
1 A <20 0
1 A >20 & <40 1
1 B >20 & <40 0
1 C >20 & <40 0
1 C >20 & <40 0
2 A <20 0
3 B >20 & <40 1
3 B >40 2








Output


1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2
2 A:1 <20:1 0:01
3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5

Presently this is how I am calculating these values:
Group by UserId and C1 and compute values for C1 for all the users, then do
a group by by Userid and C2 and find the fractions for C2 for each user and
so on. This approach is quite slow.  Also the number of records for each
user will be at max 30. So I would like to take a second approach wherein I
do a groupByKey and pass the entire list of records for each key to a
function which computes all the percentages for each column for each user
at once. Below are the steps I am trying to follow:

1. Dataframe1 => group by UserId , find the counts of records for each
user. Join the results back to the input so that counts are available with
each record
2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2))

def myUserAggregator(rows: Iterable[Row]):
scala.collection.mutable.Map[Int,String] = {
    val returnValue = scala.collection.mutable.Map[Int,String]()
    if (rows != null) {
      val activityMap = scala.collection.mutable.Map[Int,
scala.collection.mutable.Map[String,
Int]]().withDefaultValue(scala.collection.mutable.Map[String,
Int]().withDefaultValue(0))

      val rowIt = rows.iterator
      var sentCount = 1
      for (row <- rowIt) {
        sentCount = row(29).toString().toInt
        for (i <- 0 until row.length) {
          var m = activityMap(i)
          if (activityMap(i) == null) {
            m = collection.mutable.Map[String,
Int]().withDefaultValue(0)
          }
          m(row(i).toString()) += 1
          activityMap.update(i, m)
        }
      }
      var activityPPRow: Row = Row()
      for((k,v) <- activityMap) {
          var rowVal:String = ""
          for((a,b) <- v) {
            rowVal += rowVal + a + ":" + b/sentCount + "|"
          }
          returnValue.update(k, rowVal)
    //      activityPPRow.apply(k) = rowVal
      }

    }
    return returnValue
  }

When I run step 2 I get the following error. I am new to Scala and Spark
and am unable to figure out how to pass the Iterable[Row] to a function and
get back the results.

org.apache.spark.SparkException: Task not serializable
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
    at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
    at org.apache.spark.rdd.RDD.map(RDD.scala:317)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:102)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:104)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:106)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:108)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:110)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:112)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:114)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:116)
......


Thanks for the help.

Regards,
Neha Mehta

Reply via email to