- dev I think you should be able to write an Aggregator <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>. You probably want to run in update mode if you are looking for it to output any group that has changed in the batch.
On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <piyush.muk...@gmail.com> wrote: > Hi, > we are migrating some jobs from Dstream to Structured Stream. > > Currently to handle aggregations we call map and reducebyKey on each RDD > like > rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b)) > > The final output of each RDD is merged to the sink with support for > aggregation at the sink( Like co-processor at HBase ). > > In the new DataSet API, I am not finding any suitable API to aggregate > over the micro-batch. > Most of the aggregation API uses state-store and provide global > aggregations. ( with append mode it does not give the change in existing > buckets ) > Problems we are suspecting are : > 1) state-store is tightly linked to the job definitions. while in our > case we want may edit the job while keeping the older calculated aggregate > as it is. > > The desired result can be achieved with below dataset APIs. > dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr)) > while on observing the physical plan it does not call any merge before > sort. > > Anyone aware of API or other workarounds to get the desired result? >