http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java new file mode 100644 index 0000000..0c8bc97 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + + +/** + * @param <IN1> First input type + * @param <IN2> Second input type + * @param <OUT> Output type + */ +public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable { + + /** + * User defined function for the cross operator. + * + * @param record1 Record from first input + * @param record2 Record from the second input + * @return result of cross UDF. + * @throws Exception + */ + OUT cross(IN1 record1, IN2 record2) throws Exception; + +}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java new file mode 100644 index 0000000..2f68477 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; +import java.io.Serializable; + +public interface FilterFunction<T> extends Function, Serializable { + + /** + * User defined function for a filter. + * + * @param value Incoming tuples + * @return true for tuples that are allowed to pass the filter + * @throws Exception + */ + boolean filter(T value) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java new file mode 100644 index 0000000..b2c8f30 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.flink.util.Collector; + +/** + * Generic interface used for combiners. + */ +public interface FlatCombineFunction<T> extends Function, Serializable { + + void combine(Iterator<T> values, Collector<T> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java new file mode 100644 index 0000000..6a6b971 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + +import org.apache.flink.util.Collector; + +import java.io.Serializable; + + +public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable { + + void join (IN1 left, IN2 right, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java new file mode 100644 index 0000000..a8696cf --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + +import org.apache.flink.util.Collector; + +import java.io.Serializable; + + +/** + * + * @param <T> + * @param <O> + */ +public interface FlatMapFunction<T, O> extends Function, Serializable { + + /** + * The core method of FlatMappable. Takes an element from the input data set and transforms + * it into zero, one, or more elements. + * + * @param value The input value. + * @param out The collector for for emitting result values. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + void flatMap(T value, Collector<O> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java index d3b7db4..c2a201f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java @@ -16,82 +16,13 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; -import org.apache.flink.configuration.Configuration; - /** - * An base interface for all user-defined functions. This class defines methods for - * the life cycle of the functions, as well as methods to access the context in which the functions - * are executed. + * An base interface for all user-defined functions. This interface is empty in order + * to enable functions that are SAM (single abstract method) interfaces, so that they + * can be called as Java 8 lambdas */ public interface Function { - - /** - * Initialization method for the function. It is called before the actual working methods - * (like <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that - * are part of an iteration, this method will be invoked at the beginning of each iteration superstep. - * <p> - * The configuration object passed to the function can be used for configuration and initialization. - * The configuration contains all parameters that were configured on the function in the program - * composition. - * - * <pre><blockquote> - * public class MyMapper extends FilterFunction<String> { - * - * private String searchString; - * - * public void open(Configuration parameters) { - * this.searchString = parameters.getString("foo"); - * } - * - * public boolean filter(String value) { - * return value.equals(searchString); - * } - * } - * </blockquote></pre> - * <p> - * By default, this method does nothing. - * - * @param parameters The configuration containing the parameters attached to the contract. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the task and lets the fail-over logic - * decide whether to retry the task execution. - * - * @see org.apache.flink.configuration.Configuration - */ - void open(Configuration parameters) throws Exception; - /** - * Teardown method for the user code. It is called after the last call to the main working methods - * (e.g. <i>map</i> or <i>join</i>). For functions that are part of an iteration, this method will - * be invoked after each iteration superstep. - * <p> - * This method can be used for clean up work. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the task and lets the fail-over logic - * decide whether to retry the task execution. - */ - void close() throws Exception; - - - /** - * Gets the context that contains information about the UDF's runtime. - * - * Context information are for example {@link org.apache.flink.api.common.accumulators.Accumulator}s - * or the {@link org.apache.flink.api.common.cache.DistributedCache}. - * - * @return The UDF's runtime context. - */ - RuntimeContext getRuntimeContext(); - - /** - * Sets the function's runtime context. Called by the framework when creating a parallel instance of the function. - * - * @param t The runtime context. - */ - void setRuntimeContext(RuntimeContext t); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java deleted file mode 100644 index 59669a2..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - -import java.util.Iterator; - -import org.apache.flink.util.Collector; - - -public interface GenericCoGrouper<V1, V2, O> extends Function { - - /** - * This method must be implemented to provide a user implementation of a - * coGroup. It is called for each two key-value pairs that share the same - * key and come from different inputs. - * - * @param records1 The records from the first input which were paired with the key. - * @param records2 The records from the second input which were paired with the key. - * @param out A collector that collects all output pairs. - */ - void coGroup(Iterator<V1> records1, Iterator<V2> records2, Collector<O> out) throws Exception; - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java index ada4eeb..41cfa1d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java @@ -23,7 +23,7 @@ import org.apache.flink.util.Collector; -public interface GenericCollectorMap<T, O> extends Function { +public interface GenericCollectorMap<T, O> extends RichFunction { void map(T record, Collector<O> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java deleted file mode 100644 index 8dfe758..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - -import java.util.Iterator; - -import org.apache.flink.util.Collector; - -/** - * Generic interface used for combiners. - */ -public interface GenericCombine<T> extends Function { - - void combine(Iterator<T> records, Collector<T> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java deleted file mode 100644 index 3de9b1d..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - -import org.apache.flink.util.Collector; - - -/** - * @param <V1> First input type - * @param <V2> Second input type - * @param <O> Output type - */ -public interface GenericCrosser<V1, V2, O> extends Function { - - /** - * User defined function for the cross operator. - * - * @param record1 Record from first input - * @param record2 Record from the second input - * @param out Collector to submit resulting records. - * @throws Exception - */ - void cross(V1 record1, V2 record2, Collector<O> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java deleted file mode 100644 index f34b038..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - - -public interface GenericFilter<T> extends Function { - - /** - * User defined function for a filter. - * - * @param value Incoming tuples - * @return true for tuples that are allowed to pass the filter - * @throws Exception - */ - boolean filter(T value) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java deleted file mode 100644 index efb1d49..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - -import org.apache.flink.util.Collector; - - -/** - * - * @param <T> - * @param <O> - */ -public interface GenericFlatMap<T, O> extends Function { - - /** - * User defined function to perform transformations on records. - * This method allows to submit an arbitrary number of records - * per incoming tuple. - * - * @param record incoming record - * @param out outgoing collector to return none, one or more records - * @throws Exception - */ - void flatMap(T record, Collector<O> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java deleted file mode 100644 index e8d9910..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - -import java.util.Iterator; - -import org.apache.flink.util.Collector; - - -/** - * - * @param <T> Incoming types - * @param <O> Outgoing types - */ -public interface GenericGroupReduce<T, O> extends Function { - /** - * - * The central function to be implemented for a reducer. The function receives per call one - * key and all the values that belong to that key. Each key is guaranteed to be processed by exactly - * one function call across all involved instances across all computing nodes. - * - * @param records All records that belong to the given input key. - * @param out The collector to hand results to. - * @throws Exception - */ - void reduce(Iterator<T> records, Collector<O> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java deleted file mode 100644 index 77c2ac9..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - -import org.apache.flink.util.Collector; - - -public interface GenericJoiner<V1, V2, O> extends Function { - - void join(V1 value1, V2 value2, Collector<O> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java deleted file mode 100644 index 316bf5d..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - - -public interface GenericMap<T, O> extends Function { - - /** - * A user-implemented function that modifies or transforms an incoming object and - * returns the result. - */ - O map(T record) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java deleted file mode 100644 index 9e75f2e..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - - -public interface GenericReduce<T> extends Function { - - T reduce(T value1, T value2) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java new file mode 100644 index 0000000..984d1fd --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.flink.util.Collector; + + +/** + * + * @param <T> Incoming types + * @param <O> Outgoing types + */ +public interface GroupReduceFunction<T, O> extends Function, Serializable { + /** + * + * The central function to be implemented for a reducer. The function receives per call one + * key and all the values that belong to that key. Each key is guaranteed to be processed by exactly + * one function call across all involved instances across all computing nodes. + * + * @param records All records that belong to the given input key. + * @param out The collector to hand results to. + * @throws Exception + */ + void reduce(Iterator<T> values, Collector<O> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java new file mode 100644 index 0000000..02f526a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + + +public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable { + + OUT join(IN1 first, IN2 second) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java new file mode 100644 index 0000000..4e2520d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + + +import java.io.Serializable; + +public interface MapFunction<T, O> extends Function, Serializable { + + /** + * The core method of Mappable. Takes an element from the input data set and transforms + * it into exactly one element. + * + * @param value The input value. + * @returns The transformed value + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + O map(T value) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java new file mode 100644 index 0000000..04f690a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + + +import java.io.Serializable; + +public interface ReduceFunction<T> extends Function, Serializable { + + /** + * The core method of Reducible, combining two values into one value of the same type. + * The reduce function is consecutively applied to all values of a group until only a single value remains. + * + * @param value1 The first value to combine. + * @param value2 The second value to combine. + * @return The combined value of both input values. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + T reduce(T value1, T value2) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java new file mode 100644 index 0000000..ffc3ac2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + +import org.apache.flink.configuration.Configuration; + +/** + * An base interface for all rich user-defined functions. This class defines methods for + * the life cycle of the functions, as well as methods to access the context in which the functions + * are executed. + */ +public interface RichFunction extends Function { + + /** + * Initialization method for the function. It is called before the actual working methods + * (like <i>map</i> or <i>join</i>) and thus suitable for one time setup work. For functions that + * are part of an iteration, this method will be invoked at the beginning of each iteration superstep. + * <p> + * The configuration object passed to the function can be used for configuration and initialization. + * The configuration contains all parameters that were configured on the function in the program + * composition. + * + * <pre><blockquote> + * public class MyMapper extends FilterFunction<String> { + * + * private String searchString; + * + * public void open(Configuration parameters) { + * this.searchString = parameters.getString("foo"); + * } + * + * public boolean filter(String value) { + * return value.equals(searchString); + * } + * } + * </blockquote></pre> + * <p> + * By default, this method does nothing. + * + * @param parameters The configuration containing the parameters attached to the contract. + * + * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the + * runtime catches an exception, it aborts the task and lets the fail-over logic + * decide whether to retry the task execution. + * + * @see org.apache.flink.configuration.Configuration + */ + void open(Configuration parameters) throws Exception; + + /** + * Teardown method for the user code. It is called after the last call to the main working methods + * (e.g. <i>map</i> or <i>join</i>). For functions that are part of an iteration, this method will + * be invoked after each iteration superstep. + * <p> + * This method can be used for clean up work. + * + * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the + * runtime catches an exception, it aborts the task and lets the fail-over logic + * decide whether to retry the task execution. + */ + void close() throws Exception; + + + /** + * Gets the context that contains information about the UDF's runtime. + * + * Context information are for example {@link org.apache.flink.api.common.accumulators.Accumulator}s + * or the {@link org.apache.flink.api.common.cache.DistributedCache}. + * + * @return The UDF's runtime context. + */ + RuntimeContext getRuntimeContext(); + + /** + * Sets the function's runtime context. Called by the framework when creating a parallel instance of the function. + * + * @param t The runtime context. + */ + void setRuntimeContext(RuntimeContext t); +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index a5f0b19..3cf30ff 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -34,7 +34,7 @@ import org.apache.flink.api.common.cache.DistributedCache; * the current degree of parallelism) and other constructs like accumulators and broadcast variables. * <p> * A function can, during runtime, obtain the RuntimeContext via a call to - * {@link org.apache.flink.api.common.functions.AbstractFunction#getRuntimeContext()}. + * {@link AbstractRichFunction#getRuntimeContext()}. */ public interface RuntimeContext { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java new file mode 100644 index 0000000..bc4ffd0 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; + +import java.lang.invoke.SerializedLambda; +import java.lang.reflect.Method; + +public class FunctionUtils { + + + public static void openFunction (Function function, Configuration parameters) throws Exception{ + if (function instanceof RichFunction) { + RichFunction richFunction = (RichFunction) function; + richFunction.open (parameters); + } + } + + public static void closeFunction (Function function) throws Exception{ + if (function instanceof RichFunction) { + RichFunction richFunction = (RichFunction) function; + richFunction.close (); + } + } + + public static void setFunctionRuntimeContext (Function function, RuntimeContext context){ + if (function instanceof RichFunction) { + RichFunction richFunction = (RichFunction) function; + richFunction.setRuntimeContext(context); + } + } + + public static RuntimeContext getFunctionRuntimeContext (Function function, RuntimeContext defaultContext){ + if (function instanceof RichFunction) { + RichFunction richFunction = (RichFunction) function; + return richFunction.getRuntimeContext(); + } + else { + return defaultContext; + } + } + + public static boolean isSerializedLambdaFunction(Function function) { + Class<?> clazz = function.getClass(); + try { + Method replaceMethod = clazz.getDeclaredMethod("writeReplace"); + replaceMethod.setAccessible(true); + Object serializedForm = replaceMethod.invoke(function); + if (serializedForm instanceof SerializedLambda) { + return true; + } + else { + return false; + } + } + catch (Exception e) { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java index b140dda..c416765 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java @@ -19,13 +19,13 @@ package org.apache.flink.api.common.operators; -import org.apache.flink.api.common.functions.AbstractFunction; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; /** * This operator represents a Union between two inputs. */ -public class Union<T> extends DualInputOperator<T, T, T, AbstractFunction> { +public class Union<T> extends DualInputOperator<T, T, T, AbstractRichFunction> { private final static String NAME = "Union"; @@ -34,7 +34,7 @@ public class Union<T> extends DualInputOperator<T, T, T, AbstractFunction> { */ public Union(BinaryOperatorInformation<T, T, T> operatorInfo) { // we pass it an AbstractFunction, because currently all operators expect some form of UDF - super(new UserCodeClassWrapper<AbstractFunction>(AbstractFunction.class), operatorInfo, NAME); + super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, NAME); } public Union(Operator<T> input1, Operator<T> input2) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java index ffcab50..66bea7f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java @@ -29,7 +29,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.aggregators.AggregatorRegistry; import org.apache.flink.api.common.aggregators.ConvergenceCriterion; -import org.apache.flink.api.common.functions.AbstractFunction; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.GenericCollectorMap; import org.apache.flink.api.common.operators.IterationOperator; import org.apache.flink.api.common.operators.Operator; @@ -48,7 +48,7 @@ import org.apache.flink.util.Visitor; /** * */ -public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractFunction> implements IterationOperator { +public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRichFunction> implements IterationOperator { private static String DEFAULT_NAME = "<Unnamed Bulk Iteration>"; @@ -78,7 +78,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractFunc * @param name */ public BulkIterationBase(UnaryOperatorInformation<T, T> operatorInfo, String name) { - super(new UserCodeClassWrapper<AbstractFunction>(AbstractFunction.class), operatorInfo, name); + super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, name); inputPlaceHolder = new PartialSolutionPlaceHolder<T>(this, this.getOperatorInfo()); } @@ -230,7 +230,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractFunc /** * Special Mapper that is added before a termination criterion and is only a container for an special aggregator */ - public static class TerminationCriterionMapper<X> extends AbstractFunction implements Serializable, GenericCollectorMap<X, Nothing> { + public static class TerminationCriterionMapper<X> extends AbstractRichFunction implements Serializable, GenericCollectorMap<X, Nothing> { private static final long serialVersionUID = 1L; private TerminationCriterionAggregator aggregator; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java index a9ae97c..4b85a31 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java @@ -19,7 +19,7 @@ package org.apache.flink.api.common.operators.base; -import org.apache.flink.api.common.functions.GenericCoGrouper; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputOperator; import org.apache.flink.api.common.operators.Ordering; @@ -28,9 +28,9 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; /** - * @see GenericCoGrouper + * @see org.apache.flink.api.common.functions.CoGroupFunction */ -public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends GenericCoGrouper<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> { +public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> { /** * The ordering for the order inside a group from input one. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java index 33e150d..a66ea72 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java @@ -19,7 +19,7 @@ package org.apache.flink.api.common.operators.base; -import org.apache.flink.api.common.functions.GenericCrosser; +import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputOperator; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; @@ -28,9 +28,9 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; /** - * @see GenericCrosser + * @see org.apache.flink.api.common.functions.CrossFunction */ -public class CrossOperatorBase<IN1, IN2, OUT, FT extends GenericCrosser<?, ?, ?>> extends DualInputOperator<IN1, IN2, OUT, FT> { +public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<?, ?, ?>> extends DualInputOperator<IN1, IN2, OUT, FT> { public CrossOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, String name) { super(udf, operatorInfo, name); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java index 89e5008..8e955b1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java @@ -23,7 +23,7 @@ import java.util.Collections; import java.util.Map; import org.apache.flink.api.common.aggregators.AggregatorRegistry; -import org.apache.flink.api.common.functions.AbstractFunction; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputOperator; import org.apache.flink.api.common.operators.IterationOperator; @@ -51,7 +51,7 @@ import org.apache.flink.util.Visitor; * This class is a subclass of {@code DualInputOperator}. The solution set is considered the first input, the * workset is considered the second input. */ -public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, AbstractFunction> implements IterationOperator { +public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, AbstractRichFunction> implements IterationOperator { private final Operator<ST> solutionSetPlaceholder; @@ -88,7 +88,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab } public DeltaIterationBase(BinaryOperatorInformation<ST, WT, ST> operatorInfo, int[] keyPositions, String name) { - super(new UserCodeClassWrapper<AbstractFunction>(AbstractFunction.class), operatorInfo, name); + super(new UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), operatorInfo, name); this.solutionSetKeyFields = keyPositions; solutionSetPlaceholder = new SolutionSetPlaceHolder<ST>(this, new OperatorInformation<ST>(operatorInfo.getFirstInputType())); worksetPlaceholder = new WorksetPlaceHolder<WT>(this, new OperatorInformation<WT>(operatorInfo.getSecondInputType())); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java index 34896a2..3c28c43 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java @@ -19,7 +19,7 @@ package org.apache.flink.api.common.operators.base; -import org.apache.flink.api.common.functions.GenericFlatMap; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; @@ -28,9 +28,9 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; /** - * @see GenericFlatMap + * @see org.apache.flink.api.common.functions.FlatMapFunction */ -public class FilterOperatorBase<T, FT extends GenericFlatMap<T, T>> extends SingleInputOperator<T, T, FT> { +public class FilterOperatorBase<T, FT extends FlatMapFunction<T, T>> extends SingleInputOperator<T, T, FT> { public FilterOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<T, T> operatorInfo, String name) { super(udf, operatorInfo, name); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java index 0de236e..89575b6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java @@ -19,7 +19,7 @@ package org.apache.flink.api.common.operators.base; -import org.apache.flink.api.common.functions.GenericFlatMap; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; @@ -28,9 +28,9 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; /** - * @see GenericFlatMap + * @see org.apache.flink.api.common.functions.FlatMapFunction */ -public class FlatMapOperatorBase<IN, OUT, FT extends GenericFlatMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { +public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { public FlatMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) { super(udf, operatorInfo, name); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java index a24826a..ac55489 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java @@ -19,8 +19,9 @@ package org.apache.flink.api.common.operators.base; -import org.apache.flink.api.common.functions.GenericCombine; -import org.apache.flink.api.common.functions.GenericGroupReduce; + +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; @@ -30,9 +31,9 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; /** - * @see GenericGroupReduce + * @see org.apache.flink.api.common.functions.GroupReduceFunction */ -public class GroupReduceOperatorBase<IN, OUT, FT extends GenericGroupReduce<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { +public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { /** * The ordering for the order inside a reduce group. @@ -91,15 +92,15 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GenericGroupReduce<IN, /** * Marks the group reduce operation as combinable. Combinable operations may pre-reduce the * data before the actual group reduce operations. Combinable user-defined functions - * must implement the interface {@link GenericCombine}. + * must implement the interface {@link org.apache.flink.api.common.functions.FlatCombineFunction}. * * @param combinable Flag to mark the group reduce operation as combinable. */ public void setCombinable(boolean combinable) { // sanity check - if (combinable && !GenericCombine.class.isAssignableFrom(this.userFunction.getUserCodeClass())) { - throw new IllegalArgumentException("Cannot set a UDF as combinable if it does not implement the interface " + - GenericCombine.class.getName()); + if (combinable && !FlatCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass())) { + throw new IllegalArgumentException("Cannot set a UDF as combinable if it does not implement the interface " + + FlatCombineFunction.class.getName()); } else { this.combinable = combinable; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java index b4eeeaa..2ce0529 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java @@ -19,7 +19,7 @@ package org.apache.flink.api.common.operators.base; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputOperator; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; @@ -27,9 +27,9 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; /** - * @see GenericJoiner + * @see org.apache.flink.api.common.functions.FlatJoinFunction */ -public class JoinOperatorBase<IN1, IN2, OUT, FT extends GenericJoiner<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> +public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> { public JoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) { super(udf, operatorInfo, keyPositions1, keyPositions2, name); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java index efd8fa9..26fde05 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java @@ -19,7 +19,7 @@ package org.apache.flink.api.common.operators.base; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; @@ -33,7 +33,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; * @param <OUT> The result type. * @param <FT> The type of the user-defined function. */ -public class MapOperatorBase<IN, OUT, FT extends GenericMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { +public class MapOperatorBase<IN, OUT, FT extends MapFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { public MapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) { super(udf, operatorInfo, name); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java index 62996ea..e6c435f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java @@ -19,7 +19,7 @@ package org.apache.flink.api.common.operators.base; -import org.apache.flink.api.common.functions.GenericReduce; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; @@ -31,12 +31,12 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; * Base data flow operator for Reduce user-defined functions. Accepts reduce functions * and key positions. The key positions are expected in the flattened common data model. * - * @see GenericReduce + * @see org.apache.flink.api.common.functions.ReduceFunction * * @param <T> The type (parameters and return type) of the reduce function. * @param <FT> The type of the reduce function. */ -public class ReduceOperatorBase<T, FT extends GenericReduce<T>> extends SingleInputOperator<T, T, FT> { +public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleInputOperator<T, T, FT> { /** * Creates a grouped reduce data flow operator. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java index 2ebae24..7d6495b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java @@ -25,11 +25,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.flink.api.common.functions.GenericCoGrouper; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.GenericCollectorMap; -import org.apache.flink.api.common.functions.GenericCrosser; -import org.apache.flink.api.common.functions.GenericGroupReduce; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.io.InputFormat; @@ -56,10 +56,10 @@ public class OperatorUtil { static { STUB_CONTRACTS.put(GenericCollectorMap.class, CollectorMapOperatorBase.class); - STUB_CONTRACTS.put(GenericGroupReduce.class, GroupReduceOperatorBase.class); - STUB_CONTRACTS.put(GenericCoGrouper.class, CoGroupOperatorBase.class); - STUB_CONTRACTS.put(GenericCrosser.class, CrossOperatorBase.class); - STUB_CONTRACTS.put(GenericJoiner.class, JoinOperatorBase.class); + STUB_CONTRACTS.put(GroupReduceFunction.class, GroupReduceOperatorBase.class); + STUB_CONTRACTS.put(CoGroupFunction.class, CoGroupOperatorBase.class); + STUB_CONTRACTS.put(CrossFunction.class, CrossOperatorBase.class); + STUB_CONTRACTS.put(FlatJoinFunction.class, JoinOperatorBase.class); STUB_CONTRACTS.put(FileInputFormat.class, GenericDataSourceBase.class); STUB_CONTRACTS.put(FileOutputFormat.class, GenericDataSinkBase.class); STUB_CONTRACTS.put(InputFormat.class, GenericDataSourceBase.class); @@ -67,7 +67,7 @@ public class OperatorUtil { } /** - * Returns the associated {@link Operator} type for the given {@link org.apache.flink.api.common.functions.Function} class. + * Returns the associated {@link Operator} type for the given {@link org.apache.flink.api.common.functions.RichFunction} class. * * @param stubClass * the stub class http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java index 647ceab..30091ab 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java @@ -21,12 +21,12 @@ package org.apache.flink.api.common.operators.util; import static org.junit.Assert.assertEquals; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.GenericCoGrouper; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.GenericCollectorMap; -import org.apache.flink.api.common.functions.GenericCrosser; -import org.apache.flink.api.common.functions.GenericGroupReduce; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.io.DelimitedInputFormat; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; @@ -36,7 +36,6 @@ import org.apache.flink.api.common.operators.base.GenericDataSinkBase; import org.apache.flink.api.common.operators.base.GenericDataSourceBase; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.operators.base.JoinOperatorBase; -import org.apache.flink.api.common.operators.util.OperatorUtil; import org.apache.flink.types.IntValue; import org.junit.Test; @@ -85,7 +84,7 @@ public class OperatorUtilTest { */ @Test public void getContractClassShouldReturnNullForStub() { - final Class<?> result = OperatorUtil.getContractClass(Function.class); + final Class<?> result = OperatorUtil.getContractClass(RichFunction.class); assertEquals(null, result); } @@ -116,13 +115,13 @@ public class OperatorUtilTest { assertEquals(GenericDataSourceBase.class, result); } - static abstract class CoGrouper implements GenericCoGrouper<IntValue, IntValue, IntValue> {} + static abstract class CoGrouper implements CoGroupFunction<IntValue, IntValue, IntValue> {} - static abstract class Crosser implements GenericCrosser<IntValue, IntValue, IntValue> {} + static abstract class Crosser implements CrossFunction<IntValue, IntValue, IntValue> {} static abstract class Mapper implements GenericCollectorMap<IntValue, IntValue> {} - static abstract class Matcher implements GenericJoiner<IntValue, IntValue, IntValue> {} + static abstract class Matcher implements FlatJoinFunction<IntValue, IntValue, IntValue> {} - static abstract class Reducer implements GenericGroupReduce<IntValue, IntValue> {} + static abstract class Reducer implements GroupReduceFunction<IntValue, IntValue> {} } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml index 549e95b..ea0db5d 100644 --- a/flink-examples/flink-java-examples/pom.xml +++ b/flink-examples/flink-java-examples/pom.xml @@ -317,7 +317,6 @@ under the License. </executions> </plugin> - </plugins> </build> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java index 4c22db1..8767aca 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java @@ -21,8 +21,9 @@ package org.apache.flink.example.java.clustering; import java.io.Serializable; import java.util.Collection; -import org.apache.flink.api.java.functions.MapFunction; -import org.apache.flink.api.java.functions.ReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; @@ -197,7 +198,7 @@ public class KMeans { // ************************************************************************* /** Converts a Tuple2<Double,Double> into a Point. */ - public static final class TuplePointConverter extends MapFunction<Tuple2<Double, Double>, Point> { + public static final class TuplePointConverter implements MapFunction<Tuple2<Double, Double>, Point> { @Override public Point map(Tuple2<Double, Double> t) throws Exception { @@ -206,7 +207,7 @@ public class KMeans { } /** Converts a Tuple3<Integer, Double,Double> into a Centroid. */ - public static final class TupleCentroidConverter extends MapFunction<Tuple3<Integer, Double, Double>, Centroid> { + public static final class TupleCentroidConverter implements MapFunction<Tuple3<Integer, Double, Double>, Centroid> { @Override public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception { @@ -215,7 +216,7 @@ public class KMeans { } /** Determines the closest cluster center for a data point. */ - public static final class SelectNearestCenter extends MapFunction<Point, Tuple2<Integer, Point>> { + public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> { private Collection<Centroid> centroids; /** Reads the centroid values from a broadcast variable into a collection. */ @@ -248,7 +249,7 @@ public class KMeans { } /** Appends a count variable to the tuple. */ - public static final class CountAppender extends MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> { + public static final class CountAppender implements MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> { @Override public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t) { @@ -257,7 +258,7 @@ public class KMeans { } /** Sums and counts point coordinates. */ - public static final class CentroidAccumulator extends ReduceFunction<Tuple3<Integer, Point, Long>> { + public static final class CentroidAccumulator implements ReduceFunction<Tuple3<Integer, Point, Long>> { @Override public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) { @@ -266,7 +267,7 @@ public class KMeans { } /** Computes new centroid from coordinate sum and count of points. */ - public static final class CentroidAverager extends MapFunction<Tuple3<Integer, Point, Long>, Centroid> { + public static final class CentroidAverager implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> { @Override public Centroid map(Tuple3<Integer, Point, Long> value) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java index b71347a..0d38e06 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java @@ -20,10 +20,11 @@ package org.apache.flink.example.java.graph; import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.JoinFunction; -import org.apache.flink.api.java.functions.MapFunction; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond; @@ -127,7 +128,7 @@ public class ConnectedComponents implements ProgramDescription { * Function that turns a value into a 2-tuple where both fields are that value. */ @ConstantFields("0 -> 0,1") - public static final class DuplicateValue<T> extends MapFunction<T, Tuple2<T, T>> { + public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> { @Override public Tuple2<T, T> map(T vertex) { @@ -138,7 +139,7 @@ public class ConnectedComponents implements ProgramDescription { /** * Undirected edges by emitting for each input edge the input edges itself and an inverted version. */ - public static final class UndirectEdge extends FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>(); @Override @@ -157,7 +158,7 @@ public class ConnectedComponents implements ProgramDescription { */ @ConstantFieldsFirst("1 -> 0") @ConstantFieldsSecond("1 -> 1") - public static final class NeighborWithComponentIDJoin extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { + public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) { @@ -165,11 +166,10 @@ public class ConnectedComponents implements ProgramDescription { } } - /** - * The input is nested tuples ( (vertex-id, candidate-component) , (vertex-id, current-component) ) - */ + + @ConstantFieldsFirst("0") - public static final class ComponentIdFilter extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { + public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) { @@ -177,10 +177,10 @@ public class ConnectedComponents implements ProgramDescription { out.collect(candidate); } } - @Override - public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) { return null; } } + + @Override public String getDescription() { return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>"; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java index fba18fc..2d794bd 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java @@ -22,10 +22,10 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.JoinFunction; -import org.apache.flink.api.java.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.apache.flink.api.java.DataSet; @@ -119,7 +119,7 @@ public class EnumTrianglesBasic { // ************************************************************************* /** Converts a Tuple2 into an Edge */ - public static class TupleEdgeConverter extends MapFunction<Tuple2<Integer, Integer>, Edge> { + public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> { private final Edge outEdge = new Edge(); @Override @@ -130,7 +130,7 @@ public class EnumTrianglesBasic { } /** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */ - private static class EdgeByIdProjector extends MapFunction<Edge, Edge> { + private static class EdgeByIdProjector implements MapFunction<Edge, Edge> { @Override public Edge map(Edge inEdge) throws Exception { @@ -149,7 +149,7 @@ public class EnumTrianglesBasic { * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. * Assumes that input edges share the first vertex and are in ascending order of the second vertex. */ - private static class TriadBuilder extends GroupReduceFunction<Edge, Triad> { + private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> { private final List<Integer> vertices = new ArrayList<Integer>(); private final Triad outTriad = new Triad(); @@ -180,7 +180,7 @@ public class EnumTrianglesBasic { } /** Filters triads (three vertices connected by two edges) without a closing third edge. */ - private static class TriadFilter extends JoinFunction<Triad, Edge, Triad> { + private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> { @Override public Triad join(Triad triad, Edge edge) throws Exception {
