Have you looked at aggregators?

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html

On Fri, Apr 22, 2016 at 6:45 PM, Lee Becker <lee.bec...@hapara.com> wrote:

> Is there a way to do aggregateByKey on Datasets the way one can on an RDD?
>
> Consider the following RDD code to build a set of KeyVals into a DataFrame
> containing a column with the KeyVals' keys and a column containing lists of
> KeyVals.  The end goal is to join it with collections which which will be
> similarly transformed.
>
> case class KeyVal(k: Int, v: Int)
>
>
> val keyVals = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield KeyVal(i,j))
>
> // function for appending to list
> val addToList = (s: List[KeyVal], v: KeyVal) => s :+ v
>
> // function for merging two lists
> val addLists = (s: List[KeyVal], t: List[KeyVal]) => s++t
>
> val keyAndKeyVals = keyVals.map(kv=> (kv.k, kv))
> val keyAndNestedKeyVals = keyAndKeyVals.
>   aggregateByKey(List[KeyVal]())(addToList, addLists).
>   toDF("key", "keyvals")
> keyAndNestedKeyVals.show
>
>
> which produces:
>
> +---+--------------------+
> |key|             keyvals|
> +---+--------------------+
> |  1|[[1,4], [1,5], [1...|
> |  2|[[2,4], [2,5], [2...|
> |  3|[[3,4], [3,5], [3...|
> +---+--------------------+
>
> For a Dataset approach I tried the following to no avail:
>
> // Initialize as Dataset
> val keyVals = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield 
> KeyVal(i,j)).
>   toDS
>
> // Build key, keyVal mappings
> val keyValsByKey = keyVals.groupBy(kv=>(kv.k))
>
> case class NestedKeyVal(key: Int, keyvals: List[KeyVal])
>
> val convertToNested = (key: Int, keyValsIter: Iterator[KeyVal]) => 
> NestedKeyVal(key=key, keyvals=keyValsIter.toList)
>
> val keyValsNestedByKey = keyValsByKey.mapGroups((key,keyvals) => 
> convertToNested(key,keyvals))
> keyValsNestedByKey.show
>
>
> This and several other incantations using groupBy + mapGroups consistently
> gives me serialization problems.  Is this because the iterator can not be
> guaranteed across boundaries?
> Or is there some issue with what a Dataset can encode in the interim.
> What other ways might I approach this problem?
>
> Thanks,
> Lee
>
>

Reply via email to