[
https://issues.apache.org/jira/browse/FLINK-65?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stephan Ewen resolved FLINK-65.
-------------------------------
Resolution: Fixed
Fix Version/s: (was: pre-apache)
0.7-incubating
Assignee: Stephan Ewen
Solved through mapPartition() function, added in
d4de9774b3237bb1850024b1208640bc50f7adab
> Support custom and efficient (in-memory) pre-aggregations (without Combiner)
> ----------------------------------------------------------------------------
>
> Key: FLINK-65
> URL: https://issues.apache.org/jira/browse/FLINK-65
> Project: Flink
> Issue Type: Improvement
> Reporter: GitHub Import
> Assignee: Stephan Ewen
> Labels: github-import
> Fix For: 0.7-incubating
>
>
> I use and evaluate Stratosphere in the course of my thesis and want to give
> feedback on what I found is missing or could be improved. This is written
> from a user perspective.
> ### Requirement
> We have partitioned input we want group and aggregate (by a grouping-key) to
> a single or multiple records. The input is either streamed from hdfs or
> pipelined from other contracts. Our input is not (necessarily) partitioned by
> the grouping-key. The input we want to aggregate is the input of any
> tuple-at-a-time contract such as Map, Cross or Match. We can express this
> requirement in terms of SQL: “SELECT key, aggregate(value) FROM records GROUP
> BY key”. Common cases are that the key has cardinality 1, 2, or any other
> small, medium or high cardinality.
> ### Problem
> Currently only the Combine+Reduce/CoGroup strategy is supported: Forward all
> data to a Combiner, repartition it’s output by the grouping-key and do the
> final aggregation in Reduce. **For N record this always involves emission of
> N intermediate records to the Combiner**. Even though the values are just
> forwarded to the local Combiner, there is a lot of copying, (de)serialization
> and probably some networking overhead involved (jobmanager status). Also the
> user has to manually write the code to serialize the input to a record and
> include the Combiner. If we want to do a join and aggregate, this could mean
> that a udf does nothing but to forward the joined tuples to a
> Combiner/Reducer.
> ### Goal
> Enable efficient in-memory aggregation (for the case where efficiency matters
> - otherwise I can use Combiner/Reducer approach), reduce the
> (de)serialization overhead, reduce the “stupid” code to be written by the
> user (e.g. forward all tuples to a combiner).
> ### Use cases
> Some use cases that would benefit from efficient pre-aggregation
> * Group and aggregate with low group-key cardinality: Let’s assume
> cardinality is 1 and we want to do a simple aggregation like a sum, it is
> obviously much more efficient (and easy to code) to just do the
> pre-aggregation (sum) in the udf, and send a single record to a reducer.
> * Wordcount: “SELECT word, sum(occurence) FROM word-occurences”. In this case
> occurence is a column that has constant value 1. Very simple to aggragete
> in-memory.
> * Machine Learning: Accuracy computation: “SELECT correctly-classified,
> sum(correctly-classified) GROUP BY correctly-classified“. E.g. if we trained
> a model (e.g. a numeric weight vector for logistic regression) the accuracy
> is defined as #correct-classified/#total. I do this in one of my jobs and
> have currently to emit N records with constant value true or false to
> Combiner.
> ### Possible solutions
> 1. Give close() the option to write output in all tuple-at-a-time contracts
> (hadoop-way).
> 2. Give UDF knowledge about whether this is the last element (hasNext).
> Almost similar to 1.
> 3. Add iterator-option to tuple-at-a-time contracts (map, cross, match). The
> contracts can be configured to pass an iterator over all records that are
> pipelined/streamed to this udf. E.g. via
> CrossContract.builder(MyCross.class).asIterator(true)....build(). I assume
> that this is easy to implement because the code that calls the udf probably
> looks like “while (it.hasNext){ udf(it.next) }. Not sure if this is true. The
> user would then implement a separate stub, e.g. MatchIteratorStub instead of
> MatchStub.
> 4. Keep it as it is (Combiner is the only way to pre-aggregate)
> ### Discussion
> * The current way, to use a Combiner, is very explicit and gives the system
> more knowledge about what happens. In an ideal world, the optimizer chooses
> how to do the pre-aggragation, and we would just define the aggregation
> function in the combiner. Currently however, we have to hardcode the
> serialization code that forwards everything to the Combiner and the system
> would have to understand and modify the udf to get rid of the serialization
> and to do a direct pre-aggregation.
> * Solutions 1-3 do more or less the same. We can write our own
> pre-aggregation/combiner code. If cardinality is 1, this is just a counter,
> if cardinality is medium, we can use a HashTable. After the udf processed all
> records, it can send a single or multiple pre-aggregated records to the
> Reducer. This is less explicit, but more powerfull and enables high
> efficiency. The system however still knows that we do a grouping, because we
> still have to use Reduce.
> * To make it easy for Hadoop users to switch over, solution 1 or 2 would be
> fine. Solution 3 is basically the same, but looks a bit different.
> Looking forward to hearing your opinion. I hope I didn't miss anything and
> the feature is already existing;)
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/65
> Created by: [andrehacker|https://github.com/andrehacker]
> Labels:
> Created at: Wed Aug 21 16:50:05 CEST 2013
> State: open
--
This message was sent by Atlassian JIRA
(v6.2#6252)