[
https://issues.apache.org/jira/browse/FLINK-1269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14221553#comment-14221553
]
Ufuk Celebi commented on FLINK-1269:
------------------------------------
I think Sebastian's suggestion and Viktor's aggregations are two different
things:
{code}
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> input = env.fromElements(
new Tuple2<Integer, Integer>(0, 1),
new Tuple2<Integer, Integer>(0, 2),
new Tuple2<Integer, Integer>(1, 0));
DataSet<Long> counts = input.groupBy(0).count(); // result: 2, 1
{code}
>From tests in Viktor's branch I think his changes are a better API to what we
>have right now (SQL-like aggregations):
{code}
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> input = env.fromElements(
new Tuple2<Integer, Integer>(0, 1),
new Tuple2<Integer, Integer>(0, 2),
new Tuple2<Integer, Integer>(1, 0));
DataSet<Tuple2<Integer, Integer>> counts = input.groupBy(0).count(); // result:
(2, 2), (1, 0)
{code}
Is this right?
There will also be further overlap with an "action"-style count operator, which
returns a result to the client (see
https://github.com/apache/incubator-flink/pull/210).
For the first example, I had a PR some time ago (FLINK-758 and
https://github.com/apache/incubator-flink/pull/63), which I closed, because it
wasn't really picked up.
All and all, I think we should aim for an API, which supports all three
versions of count (and other aggregations where applicable).
> Easy way to "group count" dataset
> ---------------------------------
>
> Key: FLINK-1269
> URL: https://issues.apache.org/jira/browse/FLINK-1269
> Project: Flink
> Issue Type: New Feature
> Components: Java API, Scala API
> Affects Versions: 0.7.0-incubating
> Reporter: Sebastian Schelter
> Assignee: Viktor Rosenfeld
>
> Flink should offer an easy way to group datasets and compute the sizes of the
> resulting groups. This is one of the most essential operations in distributed
> processing, yet it is very hard to implement in Flink.
> I assume it could be a show-stopper for people trying Flink, because at the
> moment, users have to perform the grouping and then write a groupReduce that
> counts the tuples in the group and extracts the group key at the same time.
> Here is what I would semantically expect to happen:
> {noformat}
> def groupCount[T, K](data: DataSet[T], extractKey: (T) => K): DataSet[(K,
> Long)] = {
> data.groupBy { extractKey }
> .reduceGroup { group => countBy(extractKey, group) }
> }
> private[this] def countBy[T, K](extractKey: T => K,
> group: Iterator[T]): (K, Long) = {
> val key = extractKey(group.next())
> var count = 1L
> while (group.hasNext) {
> group.next()
> count += 1
> }
> key -> count
> }
> {noformat}
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)