[
https://issues.apache.org/jira/browse/FLINK-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14229811#comment-14229811
]
ASF GitHub Bot commented on FLINK-1293:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/243#discussion_r21089588
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/operators/AggregationOperator.java
---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationMapFinalUdf;
+import org.apache.flink.api.java.aggregation.AggregationMapIntermediateUdf;
+import org.apache.flink.api.java.aggregation.AggregationReduceUdf;
+import org.apache.flink.api.java.aggregation.AverageAggregationFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Aggregation operator.
+ *
+ * <p>
+ * The operator has the following dependencies.
+ *
+ * <dl>
+ * <dt>functions
+ * <dd>Aggregation functions that should be computed (may contain
+ * composites).
+ * <dt>intermediateFunctions
+ * <dd>Aggregation functions that do the actual aggregation (may
+ * contain keys and intermediates expanded from composites).
+ * <dt>resultType
+ * <dd>Type information for the final result tuple.
+ * <dt>intermediateType
+ * <dd>Type information for the intermediate tuples.
+ * <dt>groupKeys
+ * <dd>Fields on which the input should be grouped.
+ * </dl>
+ *
+ * <p>A tuple field may be aggregated using multiple functions. It is
+ * therefore necessary to construct an intermediate tuple holding a copy
+ * of the field value for each aggregation function that uses it. Also,
+ * an aggregation function may extend the input tuple with additional
+ * information. For example, {@link AverageAggregationFunction} adds a
field
+ * to count the tuples. Finally, during the aggregation group keys must be
+ * present but they may not be in the final output.
+ *
+ * <p>Therefore this operator maps to 3 internal Flink operations:
+ *
+ * <code>Input -> Map1 -> Reduce -> Map2 -> Output</code>
+ *
+ * <dl>
+ * <dt>Map1
+ * <dd>Maps input tuples to intermediate tuples; copies
+ * and/or initializes fields.
+ * <dt>Reduce
+ * <dd>Performs the actual aggregation
+ * <dt>Map2
+ * <dd>Computes composite aggregations and drops unwanted key
fields.
+ * </dl>
+ *
+ * @param <IN> The input type (must be Tuple).
+ * @param <OUT> The output type (must extend Tuple).
+ */
+public class AggregationOperator<IN, OUT extends Tuple> extends
SingleInputOperator<IN, OUT, AggregationOperator<IN, OUT>> {
+
+ private TypeInformation<Tuple> intermediateType;
+ private int[] groupKeys;
+ private AggregationFunction<?, ?>[] intermediateFunctions;
+ private AggregationFunction<?, ?>[] finalFunctions;
+
+ public AggregationOperator(DataSet<IN> input,
+ TypeInformation<OUT> resultType, TypeInformation<Tuple>
intermediateType, int[] groupKeys, AggregationFunction<?, ?>[] finalFunctions,
AggregationFunction<?, ?>[] intermediateFunctions) {
+ super(input, resultType);
+ this.intermediateType = intermediateType;
+ this.groupKeys = groupKeys;
+ this.intermediateFunctions = intermediateFunctions;
+ this.finalFunctions = finalFunctions;
+ }
+
+ @Override
+ protected org.apache.flink.api.common.operators.SingleInputOperator<?,
OUT, ?> translateToDataFlow(
+ Operator<IN> input) {
+ MapOperatorBase<IN, Tuple, MapFunction<IN, Tuple>>
intermediateMapper = createIntermediateMapper();
+ ReduceOperatorBase<Tuple, ReduceFunction<Tuple>> reducer =
createReducer();
+ MapOperatorBase<Tuple, OUT, MapFunction<Tuple, OUT>>
finalMapper = createFinalMapper();
+ intermediateMapper.setInput(input);
+ reducer.setInput(intermediateMapper);
+ finalMapper.setInput(reducer);
+ return finalMapper;
+ }
+
+ private MapOperatorBase<IN, Tuple, MapFunction<IN, Tuple>>
createIntermediateMapper() {
+ @SuppressWarnings("unchecked")
+ MapFunction<IN, Tuple> udf = (MapFunction<IN, Tuple>) new
AggregationMapIntermediateUdf<Tuple, Tuple>(intermediateFunctions);
+ UnaryOperatorInformation<IN, Tuple> operatorInfo = new
UnaryOperatorInformation<IN, Tuple>(getInputType(), intermediateType);
+ String name =
createOperatorName("aggregate/intermediate-mapper", intermediateFunctions);
+ MapOperatorBase<IN, Tuple, MapFunction<IN, Tuple>>
intermediateMapper = new MapOperatorBase<IN, Tuple, MapFunction<IN,
Tuple>>(udf, operatorInfo, name);
+
intermediateMapper.setDegreeOfParallelism(this.getParallelism());
--- End diff --
The DOP of the init mapper should be the DOP of the input operator.
Otherwise, data might be shipped over the network before it is combined.
> Add support for out-of-place aggregations
> -----------------------------------------
>
> Key: FLINK-1293
> URL: https://issues.apache.org/jira/browse/FLINK-1293
> Project: Flink
> Issue Type: Improvement
> Components: Java API, Scala API
> Affects Versions: 0.7.0-incubating
> Reporter: Viktor Rosenfeld
> Assignee: Viktor Rosenfeld
> Priority: Minor
>
> Currently, the output of an aggregation is of the same type as the input.
> This restriction has to major drawbacks:
> 1. Every tuple field can only be used in one aggregation because the
> aggregations result is stored in the field.
> 2. Aggregations having a return type that is different from the input type,
> e.g., count or average, cannot be implemented.
> It would be nice to have the aggregation return any kind of tuple as a
> result, so the restrictions above no longer apply.
> See also:
> -
> https://github.com/stratosphere/stratosphere/wiki/Design-of-Aggregate-Operator
> -
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-td2311.html
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)