I modified to


      detailInputsToGroup.map {
        case (detailInput, dataRecord) =>
          val key: StringBuilder = new StringBuilder
          dimensions.foreach {
            dimension =>
              key ++= {

Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse("")).toString
              }
          }
          (key.toString, (detailInput, dataRecord))
      }.reduceByKey {
        case (v1, v2) => {
          val v1Detail = v1._1
          val v2Detail = v2._1
          val v1Data = v1._2
          val v2Data = v2._2

*          val totalViCount =
Option(v1Data.get("totalViCount").asInstanceOf[Int]).getOrElse(0)*
*          v1Data.getRecord.put("totalViCount", totalViCount + 1)*
          (v1)
        }
      }.map {
        case (k, v) => {
          val schema = SchemaUtil.outputSchema(_detail)
          val detailOutputRecord = new DetailOutputRecord(detail, new
SessionRecord(schema))

          //Compute dimensions
          DataUtil.populateDimensions(schema, dimensions.toArray, v._1,
v._2, detailOutputRecord)

          //Construct Output
          val wrap = new AvroKey[DetailOutputRecord](detailOutputRecord)
          (wrap, NullWritable.get)
        }
      }


How do i compute unique count ?

On Tue, Jun 30, 2015 at 12:04 PM, Daniel Siegmann <
daniel.siegm...@teamaol.com> wrote:

> If the number of items is very large, have you considered using
> probabilistic counting? The HyperLogLogPlus
> <https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java>
> class from stream-lib <https://github.com/addthis/stream-lib> might be
> suitable.
>
> On Tue, Jun 30, 2015 at 2:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
>> I have a RDD of type (String,
>>  
>> Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
>> com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]
>>
>> Here String is Key and a list of tuples for that key. I got above RDD
>> after doing a groupByKey. I later want to compute total number of values
>> for a given key and total number of unique values for the same given key
>> and hence i do this
>>
>>     val totalViCount = details.size.toLong
>>     val uniqueViCount =
>> details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
>>
>> How do i do this using reduceByKey.
>>
>> *Total Code:*
>>
>>       val groupedDetail: RDD[(String, Iterable[(DetailInputRecord,
>> DataRecord)])] = detailInputsToGroup.map {
>>         case (detailInput, dataRecord) =>
>>           val key: StringBuilder = new StringBuilder
>>           dimensions.foreach {
>>             dimension =>
>>               key ++= {
>>
>> Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse("")).toString
>>               }
>>           }
>>           (key.toString, (detailInput, dataRecord))
>>       }.groupByKey
>>
>>       groupedDetail.map {
>>         case (key, values) => {
>>           val valueList = values.toList
>>
>>           //Compute dimensions // You can skup this
>>           val (detailInput, dataRecord) = valueList.head
>>           val schema = SchemaUtil.outputSchema(_detail)
>>           val detailOutput = new DetailOutputRecord(detail, new
>> SessionRecord(schema))
>>           DataUtil.populateDimensions(schema, dimensions.toArray,
>> detailInput, dataRecord, detailOutput)
>>
>>
>>           val metricsData = metricProviders.flatMap {
>>             case (className, instance) =>
>>               val data = instance.getMetrics(valueList)
>>               ReflectionUtil.getData(data,
>> _metricProviderMemberNames(className))
>>           }
>>           metricsData.map { case (k, v) => detailOutput.put(k, v) }
>>           val wrap = new AvroKey[DetailOutputRecord](detailOutput)
>>           (wrap, NullWritable.get)
>>         }
>>       }
>>
>>
>> //getMetrics:
>>   def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = {
>>     val totalViCount = details.size.toLong
>>     val uniqueViCount =
>> details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
>>     new ViewItemCountMetric(totalViCount, uniqueViCount)
>>   }
>>
>>
>> I understand that totalViCount can be implemented using reduceByKey. How
>> can i implement total unique count as i need to have the full list to know
>> the unique values.
>>
>> --
>> Deepak
>>
>>
>


-- 
Deepak

Reply via email to