For the curious, I played around with a UDAF for this (shown below). On the downside, it assembles a Map of all possible values of the column that'll need to be stored in memory somewhere.
I suspect some kind of sorted groupByKey + cogroup could stream values through, though might not support partial aggregation, then. Will try that next. /** * [[UserDefinedAggregateFunction]] for computing the mode of a string column. * * WARNING: This will assemble a Map of all possible values in memory. * * It'll ignore null values and return null if all values are null. */ class ModeAggregateFunction extends UserDefinedAggregateFunction { override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil) override def bufferSchema: StructType = StructType( StructField("values", MapType(StringType, LongType, valueContainsNull = false)) :: Nil) override def dataType: DataType = StringType override def deterministic: Boolean = true // This is the initial value for your buffer schema. override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = Map[String, Long]() } override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (input == null || input.getString(0) == null) { return } val value = input.getString(0) val frequencies = buffer.getAs[Map[String, Long]](0) val count = frequencies.getOrElse(value, 0L) buffer(0) = frequencies + (value -> (count + 1L)) } override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val frequencies1: Map[String, Long] = buffer1.getAs[Map[String, Long]](0) val frequencies2: Map[String, Long] = buffer2.getAs[Map[String, Long]](0) buffer1(0) = frequencies1 ++ frequencies2.map({ case (k: String,v: Long) => k -> (v.asInstanceOf[Long] + frequencies1.getOrElse(k, 0L)) }) } override def evaluate(buffer: Row): Any = { val frequencies = buffer.getAs[Map[String, Long]](0) if (frequencies.isEmpty) { return null } frequencies.maxBy(_._2)._1 } } On Wed, Apr 26, 2017 at 10:21 AM, Everett Anderson <ever...@nuna.com> wrote: > Hi, > > One common situation I run across is that I want to compact my data and > select the mode (most frequent value) in several columns for each group. > > Even calculating mode for one column in SQL is a bit tricky. The ways I've > seen usually involve a nested sub-select with a group by + count and then a > window function using rank(). > > However, what if you want to calculate the mode for several columns, > producing a new row with the results? And let's say the set of columns is > only known at runtime. > > In Spark SQL, I start going down a road of many self-joins. The more > efficient way leads me to either RDD[Row] or Dataset[Row] where I could do > a groupByKey + flatMapGroups, keeping state as I iterate over the Rows in > each group. > > What's the best way? > > Here's a contrived example: > > val input = spark.sparkContext.parallelize(Seq( > ("catosaur", "black", "claws"), > ("catosaur", "orange", "scales"), > ("catosaur", "black", "scales"), > ("catosaur", "orange", "scales"), > ("catosaur", "black", "spikes"), > ("bearcopter", "gray", "claws"), > ("bearcopter", "black", "fur"), > ("bearcopter", "gray", "flight"), > ("bearcopter", "gray", "flight"))) > .toDF("creature", "color", "feature") > > +----------+------+-------+ > |creature |color |feature| > +----------+------+-------+ > |catosaur |black |claws | > |catosaur |orange|scales | > |catosaur |black |scales | > |catosaur |orange|scales | > |catosaur |black |spikes | > |bearcopter|gray |claws | > |bearcopter|black |fur | > |bearcopter|gray |flight | > |bearcopter|gray |flight | > +----------+------+-------+ > > val expectedOutput = spark.sparkContext.parallelize(Seq( > ("catosaur", "black", "scales"), > ("bearcopter", "gray", "flight"))) > .toDF("creature", "color", "feature") > > +----------+-----+-------+ > |creature |color|feature| > +----------+-----+-------+ > |catosaur |black|scales | > |bearcopter|gray |flight | > +----------+-----+-------+ > > > > > >