For the curious, I played around with a UDAF for this (shown below). On the
downside, it assembles a Map of all possible values of the column that'll
need to be stored in memory somewhere.

I suspect some kind of sorted groupByKey + cogroup could stream values
through, though might not support partial aggregation, then. Will try that
next.

/**
  * [[UserDefinedAggregateFunction]] for computing the mode of a string
column.
  *
  * WARNING: This will assemble a Map of all possible values in memory.
  *
  * It'll ignore null values and return null if all values are null.
  */
class ModeAggregateFunction extends UserDefinedAggregateFunction {

  override def inputSchema: StructType = StructType(StructField("value",
StringType) :: Nil)

  override def bufferSchema: StructType = StructType(
    StructField("values", MapType(StringType, LongType, valueContainsNull =
false)) :: Nil)

  override def dataType: DataType = StringType

  override def deterministic: Boolean = true

  // This is the initial value for your buffer schema.
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = Map[String, Long]()
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
{
    if (input == null || input.getString(0) == null) {
      return
    }

    val value = input.getString(0)
    val frequencies = buffer.getAs[Map[String, Long]](0)
    val count = frequencies.getOrElse(value, 0L)

    buffer(0) = frequencies + (value -> (count + 1L))
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
= {
    val frequencies1: Map[String, Long] = buffer1.getAs[Map[String,
Long]](0)
    val frequencies2: Map[String, Long] = buffer2.getAs[Map[String,
Long]](0)

    buffer1(0) = frequencies1 ++ frequencies2.map({
      case (k: String,v: Long) => k -> (v.asInstanceOf[Long] +
frequencies1.getOrElse(k, 0L))
    })
  }

  override def evaluate(buffer: Row): Any = {
    val frequencies = buffer.getAs[Map[String, Long]](0)
    if (frequencies.isEmpty) {
      return null
    }
    frequencies.maxBy(_._2)._1
  }
}




On Wed, Apr 26, 2017 at 10:21 AM, Everett Anderson <ever...@nuna.com> wrote:

> Hi,
>
> One common situation I run across is that I want to compact my data and
> select the mode (most frequent value) in several columns for each group.
>
> Even calculating mode for one column in SQL is a bit tricky. The ways I've
> seen usually involve a nested sub-select with a group by + count and then a
> window function using rank().
>
> However, what if you want to calculate the mode for several columns,
> producing a new row with the results? And let's say the set of columns is
> only known at runtime.
>
> In Spark SQL, I start going down a road of many self-joins. The more
> efficient way leads me to either RDD[Row] or Dataset[Row] where I could do
> a groupByKey + flatMapGroups, keeping state as I iterate over the Rows in
> each group.
>
> What's the best way?
>
> Here's a contrived example:
>
> val input = spark.sparkContext.parallelize(Seq(
>     ("catosaur", "black", "claws"),
>     ("catosaur", "orange", "scales"),
>     ("catosaur", "black", "scales"),
>     ("catosaur", "orange", "scales"),
>     ("catosaur", "black", "spikes"),
>     ("bearcopter", "gray", "claws"),
>     ("bearcopter", "black", "fur"),
>     ("bearcopter", "gray", "flight"),
>     ("bearcopter", "gray", "flight")))
>     .toDF("creature", "color", "feature")
>
> +----------+------+-------+
> |creature  |color |feature|
> +----------+------+-------+
> |catosaur  |black |claws  |
> |catosaur  |orange|scales |
> |catosaur  |black |scales |
> |catosaur  |orange|scales |
> |catosaur  |black |spikes |
> |bearcopter|gray  |claws  |
> |bearcopter|black |fur    |
> |bearcopter|gray  |flight |
> |bearcopter|gray  |flight |
> +----------+------+-------+
>
> val expectedOutput = spark.sparkContext.parallelize(Seq(
>     ("catosaur", "black", "scales"),
>     ("bearcopter", "gray", "flight")))
>     .toDF("creature", "color", "feature")
>
> +----------+-----+-------+
> |creature  |color|feature|
> +----------+-----+-------+
> |catosaur  |black|scales |
> |bearcopter|gray |flight |
> +----------+-----+-------+
>
>
>
>
>
>

Reply via email to