[
https://issues.apache.org/jira/browse/FLINK-1269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14221432#comment-14221432
]
Viktor Rosenfeld edited comment on FLINK-1269 at 11/21/14 9:09 PM:
-------------------------------------------------------------------
Hi Sebastian,
you can find a first version that implements (grouped) count in this tree:
https://github.com/he-sk/incubator-flink/tree/aggregation
The API for a grouped count would be:
{noformat}
DataSet ds = ...
DataSet output = ds.groupBy(0).aggregate(count());
{noformat}
or:
{noformat}
DataSet output = ds.groupBy(0).count().aggregate();
{noformat}
I'm interested to know which version you would prefer. Your version, a function
groupCount, could be a convenience function on top of the more general API.
Unfortunately, there is no Scala API yet.
was (Author: hesk):
Hi Sebastian,
you can find a first version that implements (grouped) count in this tree:
https://github.com/he-sk/incubator-flink/tree/aggregation
The API for a grouped count would be:
{noformat}
DataSet ds = ...
DataSet output = ds.groupBy(0).aggregate(count());
{noformat}
or:
{noformat}
DataSet output = ds.groupBy(0).count().aggregate();
{noformat}
I'm interested to know which version you would prefer. Your version could be a
convenience function on top of the more general API.
Unfortunately, there is no Scala API yet.
> Easy way to "group count" dataset
> ---------------------------------
>
> Key: FLINK-1269
> URL: https://issues.apache.org/jira/browse/FLINK-1269
> Project: Flink
> Issue Type: New Feature
> Components: Java API, Scala API
> Affects Versions: 0.7.0-incubating
> Reporter: Sebastian Schelter
> Assignee: Viktor Rosenfeld
>
> Flink should offer an easy way to group datasets and compute the sizes of the
> resulting groups. This is one of the most essential operations in distributed
> processing, yet it is very hard to implement in Flink.
> I assume it could be a show-stopper for people trying Flink, because at the
> moment, users have to perform the grouping and then write a groupReduce that
> counts the tuples in the group and extracts the group key at the same time.
> Here is what I would semantically expect to happen:
> {noformat}
> def groupCount[T, K](data: DataSet[T], extractKey: (T) => K): DataSet[(K,
> Long)] = {
> data.groupBy { extractKey }
> .reduceGroup { group => countBy(extractKey, group) }
> }
> private[this] def countBy[T, K](extractKey: T => K,
> group: Iterator[T]): (K, Long) = {
> val key = extractKey(group.next())
> var count = 1L
> while (group.hasNext) {
> group.next()
> count += 1
> }
> key -> count
> }
> {noformat}
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)