http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java new file mode 100644 index 0000000..a4e1248 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.CrossFunction; + + +/** + * The abstract base class for Cross functions. Cross functions build a Cartesian produce of their inputs + * and call the function or each pair of elements. + * They are a means of convenience and can be used to directly produce manipulate the + * pair of elements, instead of having the operator build 2-tuples, and then using a + * MapFunction over those 2-tuples. + * <p> + * The basic syntax for using Cross on two data sets is as follows: + * <pre><blockquote> + * DataSet<X> set1 = ...; + * DataSet<Y> set2 = ...; + * + * set1.cross(set2).with(new MyCrossFunction()); + * </blockquote></pre> + * <p> + * {@code set1} is here considered the first input, {@code set2} the second input. + * <p> + * All functions need to be serializable, as defined in {@link java.io.Serializable}. + * + * @param <IN1> The type of the elements in the first input. + * @param <IN2> The type of the elements in the second input. + * @param <OUT> The type of the result elements. + */ +public abstract class RichCrossFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> { + + private static final long serialVersionUID = 1L; + + + /** + * The core method of the cross operation. The method will be invoked for each pair of elements + * in the Cartesian product. + * + * @param first The element from the first input. + * @param second The element from the second input. + * @return The result element. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public abstract OUT cross(IN1 first, IN2 second) throws Exception; + +}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java new file mode 100644 index 0000000..e3baa74 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * The abstract base class for Filter functions. A filter function take elements and evaluates a + * predicate on them to decide whether to keep the element, or to discard it. + * <p> + * The basic syntax for using a FilterFunction is as follows: + * <pre><blockquote> + * DataSet<X> input = ...; + * + * DataSet<X> result = input.filter(new MyFilterFunction()); + * </blockquote></pre> + * <p> + * Like all functions, the FilterFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * + * @param <T> The type of the filtered elements. + */ +public abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> { + + private static final long serialVersionUID = 1L; + + /** + * The core method of the FilterFunction. The method is called for each element in the input, + * and determines whether the element should be kept or filtered out. If the method returns true, + * the element passes the filter and is kept, if the method returns false, the element is + * filtered out. + * + * @param value The input value to be filtered. + * @return Flag to indicate whether to keep the value (true) or to discard it (false). + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public abstract boolean filter(T value) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java new file mode 100644 index 0000000..8c326c6 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.util.Collector; + +import java.util.Iterator; + +public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction implements FlatCombineFunction<T> { + + private static final long serialVersionUID = 1L; + + @Override + public abstract void combine(Iterator<T> values, Collector<T> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java new file mode 100644 index 0000000..15b4539 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.util.Collector; + +/** + * The abstract base class for Join functions. Join functions combine two data sets by joining their + * elements on specified keys and calling this function with each pair of joining elements. + * By default, this follows strictly the semantics of an "inner join" in SQL. + * the semantics are those of an "inner join", meaning that elements are filtered out + * if their key is not contained in the other data set. + * <p> + * Per the semantics of an inner join, the function is + * <p> + * The basic syntax for using Join on two data sets is as follows: + * <pre><blockquote> + * DataSet<X> set1 = ...; + * DataSet<Y> set2 = ...; + * + * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction()); + * </blockquote></pre> + * <p> + * {@code set1} is here considered the first input, {@code set2} the second input. + * The keys can be defined through tuple field positions or key extractors. + * See {@link org.apache.flink.api.java.operators.Keys} for details. + * <p> + * The Join function is actually not a necessary part of a join operation. If no JoinFunction is provided, + * the result of the operation is a sequence of Tuple2, where the elements in the tuple are those that + * the JoinFunction would have been invoked with. + * <P> + * Note: You can use a {@link RichCoGroupFunction} to perform an outer join. + * <p> + * All functions need to be serializable, as defined in {@link java.io.Serializable}. + * + * @param <IN1> The type of the elements in the first input. + * @param <IN2> The type of the elements in the second input. + * @param <OUT> The type of the result elements. + */ +public abstract class RichFlatJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction implements FlatJoinFunction<IN1, IN2, OUT> { + + private static final long serialVersionUID = 1L; + + /** + * The user-defined method for performing transformations after a join. + * The method is called with matching pairs of elements from the inputs. + * + * @param first The element from first input. + * @param second The element from second input. + * @return The resulting element. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public abstract void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java new file mode 100644 index 0000000..2293b5e --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; + +/** + * The abstract base class for flatMap functions. FlatMap functions take elements and transform them, + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists + * and arrays. Operations that produce multiple strictly one result element per input element can also + * use the {@link RichMapFunction}. + * <p> + * The basic syntax for using a FlatMapFunction is as follows: + * <pre><blockquote> + * DataSet<X> input = ...; + * + * DataSet<Y> result = input.flatMap(new MyFlatMapFunction()); + * </blockquote></pre> + * <p> + * Like all functions, the FlatMapFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * + * @param <IN> Type of the input elements. + * @param <OUT> Type of the returned elements. + */ +public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> { + + private static final long serialVersionUID = 1L; + + /** + * The core method of the FlatMapFunction. Takes an element from the input data set and transforms + * it into zero, one, or more elements. + * + * @param value The input value. + * @param out The collector for for emitting result values. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public abstract void flatMap(IN value, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java new file mode 100644 index 0000000..eb75f53 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.Iterator; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.util.Collector; + +/** + * The abstract base class for group reduce functions. Group reduce functions process groups of elements. + * They may aggregate them to a single value, or produce multiple result values for each group. + * <p> + * For a reduce functions that works incrementally by combining always two elements, see + * {@link RichReduceFunction}, called via {@link org.apache.flink.api.java.DataSet#reduce(RichReduceFunction)}. + * <p> + * The basic syntax for using a grouped GroupReduceFunction is as follows: + * <pre><blockquote> + * DataSet<X> input = ...; + * + * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction()); + * </blockquote></pre> + * <p> + * GroupReduceFunctions may be "combinable", in which case they can pre-reduce partial groups in order to + * reduce the data volume early. See the {@link #combine(Iterator, Collector)} function for details. + * <p> + * Like all functions, the GroupReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * + * @param <IN> Type of the elements that this function processes. + * @param <OUT> The type of the elements returned by the user-defined function. + */ +public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, FlatCombineFunction<IN> { + + private static final long serialVersionUID = 1L; + + /** + * Core method of the reduce function. It is called one per group of elements. If the reducer + * is not grouped, than the entire data set is considered one group. + * + * @param values The iterator returning the group of values to be reduced. + * @param out The collector to emit the returned values. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public abstract void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception; + + /** + * The combine methods pre-reduces elements. It may be called on subsets of the data + * before the actual reduce function. This is often helpful to lower data volume prior + * to reorganizing the data in an expensive way, as might be required for the final + * reduce function. + * <p> + * This method is only ever invoked when the subclass of {@link RichGroupReduceFunction} + * adds the {@link Combinable} annotation, or if the <i>combinable</i> flag is set when defining + * the <i>reduceGroup<i> operation via + * {@link org.apache.flink.api.java.operators.GroupReduceOperator#setCombinable(boolean)}. + * <p> + * Since the reduce function will be called on the result of this method, it is important that this + * method returns the same data type as it consumes. By default, this method only calls the + * {@link #reduce(Iterator, Collector)} method. If the behavior in the pre-reducing is different + * from the final reduce function (for example because the reduce function changes the data type), + * this method must be overwritten, or the execution will fail. + * + * @param values The iterator returning the group of values to be reduced. + * @param out The collector to emit the returned values. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public void combine(Iterator<IN> values, Collector<IN> out) throws Exception { + @SuppressWarnings("unchecked") + Collector<OUT> c = (Collector<OUT>) out; + reduce(values, c); + } + + // -------------------------------------------------------------------------------------------- + + /** + * This annotation can be added to classes that extend {@link RichGroupReduceFunction}, in oder to mark + * them as "combinable". The system may call the {@link RichGroupReduceFunction#combine(Iterator, Collector)} + * method on such functions, to pre-reduce the data before transferring it over the network to + * the actual group reduce operation. + * <p> + * Marking combinable functions as such is in general beneficial for performance. + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.TYPE) + public static @interface Combinable {}; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java new file mode 100644 index 0000000..7eaf44c --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.JoinFunction; + +public abstract class RichJoinFunction<IN1,IN2,OUT> extends AbstractRichFunction implements JoinFunction<IN1,IN2,OUT> { + + private static final long serialVersionUID = 1L; + + @Override + public abstract OUT join(IN1 first, IN2 second) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java new file mode 100644 index 0000000..54de7d4 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.MapFunction; + +/** + * The abstract base class for Map functions. Map functions take elements and transform them, + * element wise. A Map function always produces a single result element for each input element. + * Typical applications are parsing elements, converting data types, or projecting out fields. + * Operations that produce multiple result elements from a single input element can be implemented + * using the {@link RichFlatMapFunction}. + * <p> + * The basic syntax for using a MapFunction is as follows: + * <pre><blockquote> + * DataSet<X> input = ...; + * + * DataSet<Y> result = input.map(new MyMapFunction()); + * </blockquote></pre> + * <p> + * Like all functions, the MapFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * + * @param <IN> Type of the input elements. + * @param <OUT> Type of the returned elements. + */ +public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> { + + private static final long serialVersionUID = 1L; + + /** + * The core method of the MapFunction. Takes an element from the input data set and transforms + * it into another element. + * + * @param value The input value. + * @return The value produced by the map function from the input value. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public abstract OUT map(IN value) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java new file mode 100644 index 0000000..35cb392 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.ReduceFunction; + +/** + * The abstract base class for Reduce functions. Reduce functions combine groups of elements to + * a single value, by taking always two elements and combining them into one. Reduce functions + * may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced + * individually. + * <p> + * For a reduce functions that work on an entire group at the same time (such as the + * MapReduce/Hadoop-style reduce), see {@link RichGroupReduceFunction}, called via + * {@link org.apache.flink.api.java.DataSet#reduceGroup(RichGroupReduceFunction)}. In the general case, + * ReduceFunctions are considered faster, because they allow the system to use hash-based + * execution strategies. + * <p> + * The basic syntax for using a grouped ReduceFunction is as follows: + * <pre><blockquote> + * DataSet<X> input = ...; + * + * DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction()); + * </blockquote></pre> + * <p> + * Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * + * @param <T> Type of the elements that this function processes. + */ +public abstract class RichReduceFunction<T> extends AbstractRichFunction implements ReduceFunction<T> { + + private static final long serialVersionUID = 1L; + + /** + * The core method of the ReduceFunction, combining two values into one value of the same type. + * The reduce function is consecutively applied to all values of a group until only a single value remains. + * + * @param value1 The first value to combine. + * @param value2 The second value to combine. + * @return The combined value of both input values. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + public abstract T reduce(T value1, T value2) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java new file mode 100644 index 0000000..ccc4685 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import org.apache.flink.api.common.InvalidProgramException; + +public class UnsupportedLambdaExpressionException extends InvalidProgramException { + + private static final long serialVersionUID = -1721898801986321010L; + + public UnsupportedLambdaExpressionException() { + super("Java 8 lambda expressions are currently supported only in filter and reduce user-defined functions."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java index ca6ed94..80a5fa0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java @@ -24,7 +24,7 @@ import java.util.List; import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.GenericGroupReduce; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; @@ -32,8 +32,8 @@ import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.java.aggregation.AggregationFunction; import org.apache.flink.api.java.aggregation.AggregationFunctionFactory; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; @@ -151,7 +151,7 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate @SuppressWarnings("unchecked") @Override - protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN, IN, GenericGroupReduce<IN, IN>> translateToDataFlow(Operator<IN> input) { + protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> translateToDataFlow(Operator<IN> input) { // sanity check if (this.aggregationFunctions.isEmpty() || this.aggregationFunctions.size() != this.fields.size()) { @@ -174,7 +174,7 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate @SuppressWarnings("rawtypes") - GroupReduceFunction<IN, IN> function = new AggregatingUdf(aggFunctions, fields); + RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(aggFunctions, fields); String name = getName() != null ? getName() : genName.toString(); @@ -183,8 +183,8 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate if (this.grouping == null) { // non grouped aggregation UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getResultType()); - GroupReduceOperatorBase<IN, IN, GenericGroupReduce<IN, IN>> po = - new GroupReduceOperatorBase<IN, IN, GenericGroupReduce<IN, IN>>(function, operatorInfo, new int[0], name); + GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po = + new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name); po.setCombinable(true); @@ -200,8 +200,8 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate // grouped aggregation int[] logicalKeyPositions = this.grouping.getKeys().computeLogicalKeyPositions(); UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getResultType()); - GroupReduceOperatorBase<IN, IN, GenericGroupReduce<IN, IN>> po = - new GroupReduceOperatorBase<IN, IN, GenericGroupReduce<IN, IN>>(function, operatorInfo, logicalKeyPositions, name); + GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po = + new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name); po.setCombinable(true); @@ -245,7 +245,7 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate // -------------------------------------------------------------------------------------------- @Combinable - public static final class AggregatingUdf<T extends Tuple> extends GroupReduceFunction<T, T> { + public static final class AggregatingUdf<T extends Tuple> extends RichGroupReduceFunction<T, T> { private static final long serialVersionUID = 1L; private final int[] fieldPositions; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java index 8748556..89c3334 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java @@ -21,8 +21,9 @@ package org.apache.flink.api.java.operators; import java.security.InvalidParameterException; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.GenericCoGrouper; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; @@ -30,8 +31,8 @@ import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DeltaIteration.SolutionSetPlaceHolder; -import org.apache.flink.api.java.functions.CoGroupFunction; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; import org.apache.flink.api.java.operators.Keys.FieldPositionKeys; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.PlanUnwrappingCoGroupOperator; @@ -123,8 +124,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions(); int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions(); - CoGroupOperatorBase<I1, I2, OUT, GenericCoGrouper<I1, I2, OUT>> po = - new CoGroupOperatorBase<I1, I2, OUT, GenericCoGrouper<I1, I2, OUT>>( + CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>> po = + new CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>>( function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()), logicalKeyPositions1, logicalKeyPositions2, name); @@ -199,10 +200,10 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor()); final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor()); - final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 = - new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); - final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 = - new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); + final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 = + new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); + final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 = + new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); final PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = new PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2); cogroup.setFirstInput(keyMapper1); @@ -236,10 +237,10 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU final TupleKeyExtractingMapper<I1, K> extractor1 = new TupleKeyExtractingMapper<I1, K>(logicalKeyPositions1[0]); final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor()); - final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 = - new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); - final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 = - new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); + final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 = + new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); + final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 = + new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); final PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = new PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>(function, logicalKeyPositions1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2); @@ -274,10 +275,10 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor()); final TupleKeyExtractingMapper<I2, K> extractor2 = new TupleKeyExtractingMapper<I2, K>(logicalKeyPositions2[0]); - final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 = - new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); - final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 = - new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); + final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 = + new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); + final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 = + new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); final PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = new PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>(function, keys1, logicalKeyPositions2, name, outputType, typeInfoWithKey1, typeInfoWithKey2); @@ -407,7 +408,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @param field0 The first index of the Tuple fields of the second co-grouped DataSets that should be used as key * @param fields The indexes of the Tuple fields of the second co-grouped DataSet that should be used as keys. * @return An incomplete CoGroup transformation. - * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(CoGroupFunction)} to finalize the CoGroup transformation. + * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)} to finalize the CoGroup transformation. */ public CoGroupOperatorWithoutFunction equalTo(int field0, int... fields) { int[] actualFields = new int[fields.length + 1]; @@ -423,7 +424,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @param field0 The first field of the second co-grouped DataSets that should be used as key * @param fields The fields of the first co-grouped DataSets that should be used as keys. * @return An incomplete CoGroup transformation. - * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(CoGroupFunction)} to finalize the CoGroup transformation. + * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)} to finalize the CoGroup transformation. */ public CoGroupOperatorWithoutFunction equalTo(String field0, String... fields) { String[] actualFields = new String[fields.length + 1]; @@ -439,7 +440,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * * @param keyExtractor The KeySelector function which extracts the key values from the second DataSet on which it is grouped. * @return An incomplete CoGroup transformation. - * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(CoGroupFunction)} to finalize the CoGroup transformation. + * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)} to finalize the CoGroup transformation. */ public <K> CoGroupOperatorWithoutFunction equalTo(KeySelector<I2, K> keyExtractor) { return createCoGroupOperator(new Keys.SelectorFunctionKeys<I2, K>(keyExtractor, input2.getType())); @@ -447,8 +448,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU /** * Intermediate step of a CoGroup transformation. <br/> - * To continue the CoGroup transformation, provide a {@link CoGroupFunction} by calling - * {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(CoGroupFunction)}. + * To continue the CoGroup transformation, provide a {@link org.apache.flink.api.java.functions.RichCoGroupFunction} by calling + * {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)}. * */ private CoGroupOperatorWithoutFunction createCoGroupOperator(Keys<I2> keys2) { @@ -501,19 +502,22 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU } /** - * Finalizes a CoGroup transformation by applying a {@link CoGroupFunction} to groups of elements with identical keys.<br/> + * Finalizes a CoGroup transformation by applying a {@link org.apache.flink.api.java.functions.RichCoGroupFunction} to groups of elements with identical keys.<br/> * Each CoGroupFunction call returns an arbitrary number of keys. * * @param function The CoGroupFunction that is called for all groups of elements with identical keys. * @return An CoGroupOperator that represents the co-grouped result DataSet. * - * @see CoGroupFunction + * @see org.apache.flink.api.java.functions.RichCoGroupFunction * @see DataSet */ public <R> CoGroupOperator<I1, I2, R> with(CoGroupFunction<I1, I2, R> function) { if (function == null) { throw new NullPointerException("CoGroup function must not be null."); } + if (FunctionUtils.isSerializedLambdaFunction(function)) { + throw new UnsupportedLambdaExpressionException(); + } TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType()); return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java index 036d292..d1e99d6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java @@ -21,14 +21,15 @@ package org.apache.flink.api.java.operators; import java.util.Arrays; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.GenericCrosser; +import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.base.CrossOperatorBase; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.functions.CrossFunction; import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.types.TypeInformation; @@ -71,12 +72,12 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, } @Override - protected org.apache.flink.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, GenericCrosser<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { + protected org.apache.flink.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, CrossFunction<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { String name = getName() != null ? getName() : function.getClass().getName(); // create operator - CrossOperatorBase<I1, I2, OUT, GenericCrosser<I1, I2, OUT>> po = - new CrossOperatorBase<I1, I2, OUT, GenericCrosser<I1, I2, OUT>>(function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()), name); + CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> po = + new CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>>(function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()), name); // set inputs po.setFirstInput(input1); po.setSecondInput(input2); @@ -133,6 +134,9 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, if (function == null) { throw new NullPointerException("Cross function must not be null."); } + if (FunctionUtils.isSerializedLambdaFunction(function)) { + throw new UnsupportedLambdaExpressionException(); + } TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType()); return new CrossOperator<I1, I2, R>(input1, input2, function, returnType); } @@ -220,7 +224,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, } } - public static final class ProjectCrossFunction<T1, T2, R extends Tuple> extends CrossFunction<T1, T2, R> { + public static final class ProjectCrossFunction<T1, T2, R extends Tuple> implements CrossFunction<T1, T2, R> { private static final long serialVersionUID = 1L; @@ -1398,7 +1402,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, // default join functions // -------------------------------------------------------------------------------------------- - public static final class DefaultCrossFunction<T1, T2> extends CrossFunction<T1, T2, Tuple2<T1, T2>> { + public static final class DefaultCrossFunction<T1, T2> implements CrossFunction<T1, T2, Tuple2<T1, T2>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index cb7db06..591551f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -21,13 +21,14 @@ package org.apache.flink.api.java.operators; import java.util.Iterator; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.GenericGroupReduce; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.java.functions.GroupReduceFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; import org.apache.flink.api.java.tuple.Tuple2; @@ -78,15 +79,17 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera @Override protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) { - GroupReduceFunction<T, T> function = new DistinctFunction<T>(); + final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>(); + final FlatCombineFunction<T> combineFunction = new DistinctCombiner<T>(); + String name = function.getClass().getName(); if (keys instanceof Keys.FieldPositionKeys) { int[] logicalKeyPositions = keys.computeLogicalKeyPositions(); UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getInputType(), getResultType()); - GroupReduceOperatorBase<T, T, GenericGroupReduce<T, T>> po = - new GroupReduceOperatorBase<T, T, GenericGroupReduce<T, T>>(function, operatorInfo, logicalKeyPositions, name); + GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>> po = + new GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>>(function, operatorInfo, logicalKeyPositions, name); po.setCombinable(true); po.setInput(input); @@ -98,9 +101,10 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera @SuppressWarnings("unchecked") Keys.SelectorFunctionKeys<T, ?> selectorKeys = (Keys.SelectorFunctionKeys<T, ?>) keys; - + + PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct( - selectorKeys, function, getInputType(), getResultType(), name, input, true); + selectorKeys, function, combineFunction, getInputType(), getResultType(), name, input, true); po.setDegreeOfParallelism(this.getParallelism()); @@ -114,7 +118,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera // -------------------------------------------------------------------------------------------- private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct( - Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupReduceFunction<IN, OUT> function, + Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function, FlatCombineFunction<IN> combineFunction, TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input, boolean combinable) { @@ -124,10 +128,12 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType); KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor()); + + + PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = + new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable); - PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable); - - MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor"); + MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor"); reducer.setInput(mapper); mapper.setInput(input); @@ -138,7 +144,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera return reducer; } - public static final class DistinctFunction<T> extends GroupReduceFunction<T, T> { + public static final class DistinctFunction<T> extends RichGroupReduceFunction<T, T> { private static final long serialVersionUID = 1L; @@ -148,4 +154,16 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera out.collect(values.next()); } } + + public static final class DistinctCombiner<T> implements FlatCombineFunction<T> { + + private static final long serialVersionUID = 1L; + + @Override + public void combine(Iterator<T> values, Collector<T> out) + throws Exception { + out.collect(values.next()); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java index ed4a786..1c03ccd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java @@ -18,9 +18,9 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.GenericFlatMap; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.java.functions.FilterFunction; import org.apache.flink.api.java.operators.translation.PlanFilterOperator; import org.apache.flink.api.java.DataSet; @@ -44,7 +44,7 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat } @Override - protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, GenericFlatMap<T,T>> translateToDataFlow(Operator<T> input) { + protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, FlatMapFunction<T,T>> translateToDataFlow(Operator<T> input) { String name = getName() != null ? getName() : function.getClass().getName(); // create operator http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java index 14c0819..8e531d4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java @@ -18,11 +18,10 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.GenericFlatMap; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.FlatMapOperatorBase; -import org.apache.flink.api.java.functions.FlatMapFunction; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.DataSet; @@ -47,11 +46,11 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl } @Override - protected org.apache.flink.api.common.operators.base.FlatMapOperatorBase<IN, OUT, GenericFlatMap<IN,OUT>> translateToDataFlow(Operator<IN> input) { + protected org.apache.flink.api.common.operators.base.FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) { String name = getName() != null ? getName() : function.getClass().getName(); // create operator - FlatMapOperatorBase<IN, OUT, GenericFlatMap<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, GenericFlatMap<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name); + FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name); // set input po.setInput(input); // set dop http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java new file mode 100644 index 0000000..7ab0b11 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operators; + +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; +import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; +import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.types.TypeInformation; + +import org.apache.flink.api.java.DataSet; + + +/** + * This operator represents the application of a "reduceGroup" function on a data set, and the + * result data set produced by the function. + * + * @param <IN> The type of the data set consumed by the operator. + * @param <OUT> The type of the data set created by the operator. + */ +public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, GroupReduceOperator<IN, OUT>> { + + private final GroupReduceFunction<IN, OUT> function; + + private final Grouping<IN> grouper; + + // reduceFunction is a GroupReduceFunction + private boolean richFunction; + + private boolean combinable; + + /** + * Constructor for a non-grouped reduce (all reduce). + * + * @param input The input data set to the groupReduce function. + * @param function The user-defined GroupReduce function. + */ + public GroupReduceOperator(DataSet<IN> input, GroupReduceFunction<IN, OUT> function) { + super(input, TypeExtractor.getGroupReduceReturnTypes(function, input.getType())); + + this.function = function; + this.grouper = null; + + checkCombinability(); + } + + /** + * Constructor for a grouped reduce. + * + * @param input The grouped input to be processed group-wise by the groupReduce function. + * @param function The user-defined GroupReduce function. + */ + public GroupReduceOperator(Grouping<IN> input, GroupReduceFunction<IN, OUT> function) { + super(input != null ? input.getDataSet() : null, TypeExtractor.getGroupReduceReturnTypes(function, input.getDataSet().getType())); + + this.function = function; + this.grouper = input; + + checkCombinability(); + + extractSemanticAnnotationsFromUdf(function.getClass()); + } + + private void checkCombinability() { + if (function instanceof FlatCombineFunction && + function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != null) { + this.combinable = true; + } + } + + + // -------------------------------------------------------------------------------------------- + // Properties + // -------------------------------------------------------------------------------------------- + + public boolean isCombinable() { + return combinable; + } + + public void setCombinable(boolean combinable) { + // sanity check that the function is a subclass of the combine interface + if (combinable && !(function instanceof FlatCombineFunction)) { + throw new IllegalArgumentException("The function does not implement the combine interface."); + } + + this.combinable = combinable; + } + + @Override + protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) { + + String name = getName() != null ? getName() : function.getClass().getName(); + + // distinguish between grouped reduce and non-grouped reduce + if (grouper == null) { + // non grouped reduce + UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); + GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po = + new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, new int[0], name); + + po.setCombinable(combinable); + // set input + po.setInput(input); + // the degree of parallelism for a non grouped reduce can only be 1 + po.setDegreeOfParallelism(1); + return po; + } + + if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) { + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys(); + + PlanUnwrappingReduceGroupOperator<IN, OUT, ?> po = translateSelectorFunctionReducer( + selectorKeys, function, getInputType(), getResultType(), name, input, isCombinable()); + + po.setDegreeOfParallelism(this.getParallelism()); + + return po; + } + else if (grouper.getKeys() instanceof Keys.FieldPositionKeys) { + + int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); + UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); + GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po = + new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); + + po.setCombinable(combinable); + po.setInput(input); + po.setDegreeOfParallelism(this.getParallelism()); + + // set group order + if (grouper instanceof SortedGrouping) { + SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper; + + int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions(); + Order[] sortOrders = sortedGrouper.getGroupSortOrders(); + + Ordering o = new Ordering(); + for(int i=0; i < sortKeyPositions.length; i++) { + o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]); + } + po.setGroupOrder(o); + } + + return po; + } + else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { + + int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); + UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); + GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po = + new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); + + po.setCombinable(combinable); + po.setInput(input); + po.setDegreeOfParallelism(this.getParallelism()); + + return po; + } + else { + throw new UnsupportedOperationException("Unrecognized key type."); + } + + } + + + // -------------------------------------------------------------------------------------------- + + private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionReducer( + Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupReduceFunction<IN, OUT> function, + TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input, + boolean combinable) + { + @SuppressWarnings("unchecked") + final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys; + + TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType); + + KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor()); + + PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable); + + MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor"); + + reducer.setInput(mapper); + mapper.setInput(input); + + // set the mapper's parallelism to the input parallelism to make sure it is chained + mapper.setDegreeOfParallelism(input.getDegreeOfParallelism()); + + return reducer; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java index 37e74ef..3223f4d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java @@ -26,8 +26,8 @@ import org.apache.flink.api.java.DataSet; * Grouping is an intermediate step for a transformation on a grouped DataSet.<br/> * The following transformation can be applied on Grouping: * <ul> - * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.ReduceFunction)},</li> - * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.GroupReduceFunction)}, and</li> + * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li> + * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)}, and</li> * <li>{@link UnsortedGrouping#aggregate(org.apache.flink.api.java.aggregation.Aggregations, int)}.</li> * </ul> *
