[FLINK-1023] Switch group-at-a-time function to java.lang.Iterable (from java.util.Iterator) Iterables over transient data throw an TraversableOnceException when iterated over again.
This closes #84 Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/72d7b862 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/72d7b862 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/72d7b862 Branch: refs/heads/release-0.6 Commit: 72d7b86274c33d1570ffb22b1fca2081c15d753c Parents: 934e4e0 Author: Stephan Ewen <[email protected]> Authored: Tue Jul 29 15:58:44 2014 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Aug 1 02:33:49 2014 +0200 ---------------------------------------------------------------------- .../api/avro/AvroWithEmptyArrayITCase.java | 7 +- .../mapred/record/example/WordCount.java | 2 +- .../example/WordCountWithOutputFormat.java | 2 +- .../spargel/java/VertexCentricIteration.java | 42 ++-- .../spargel/java/record/SpargelIteration.java | 2 +- .../compiler/CoGroupSolutionSetFirstTest.java | 7 +- .../compiler/GroupReduceCompilationTest.java | 15 +- .../flink/compiler/IterationsCompilerTest.java | 5 +- .../WorksetIterationsJavaApiCompilerTest.java | 10 +- .../testfunctions/IdentityGroupReducer.java | 8 +- .../testfunctions/Top1GroupReducer.java | 7 +- .../flink/compiler/util/DummyCoGroupStub.java | 1 - .../flink/compiler/util/IdentityReduce.java | 1 - .../api/common/functions/CoGroupFunction.java | 9 +- .../api/common/functions/CombineFunction.java | 3 +- .../common/functions/FlatCombineFunction.java | 3 +- .../common/functions/GroupReduceFunction.java | 3 +- .../flink/util/TraversableOnceException.java | 28 +++ .../example/java/graph/EnumTrianglesBasic.java | 10 +- .../example/java/graph/EnumTrianglesOpt.java | 12 +- .../flink/example/java/graph/PageRankBasic.java | 6 +- .../java/graph/TransitiveClosureNaive.java | 6 +- .../example/java/relational/WebLogAnalysis.java | 10 +- .../flink/api/java/ExecutionEnvironment.java | 21 ++ .../api/java/functions/GroupReduceIterator.java | 4 +- .../api/java/functions/RichCoGroupFunction.java | 4 +- .../java/functions/RichFlatCombineFunction.java | 4 +- .../java/functions/RichGroupReduceFunction.java | 9 +- .../api/java/operators/AggregateOperator.java | 3 +- .../api/java/operators/CoGroupOperator.java | 10 +- .../api/java/operators/DistinctOperator.java | 6 +- .../api/java/operators/GroupReduceOperator.java | 5 - .../PlanUnwrappingCoGroupOperator.java | 44 +--- .../translation/PlanUnwrappingJoinOperator.java | 8 - .../PlanUnwrappingReduceGroupOperator.java | 14 +- .../translation/TupleUnwrappingIterator.java | 15 +- .../operators/translation/WrappingFunction.java | 3 +- .../java/record/functions/CoGroupFunction.java | 4 +- .../java/record/functions/ReduceFunction.java | 7 +- .../java/record/operators/CoGroupOperator.java | 61 ++++- .../java/record/operators/ReduceOperator.java | 107 ++++++-- .../DeltaIterationTranslationTest.java | 4 +- .../record/CoGroupWrappingFunctionTest.java | 221 +++++++++++++++++ .../java/record/ReduceWrappingFunctionTest.java | 246 +++++++++++++++++++ .../java/type/extractor/TypeExtractorTest.java | 5 +- .../javaApiOperators/lambdas/CoGroupITCase.java | 11 +- .../lambdas/GroupReduceITCase.java | 8 +- .../CoGroupWithSolutionSetFirstDriver.java | 6 +- .../CoGroupWithSolutionSetSecondDriver.java | 5 +- .../runtime/operators/RegularPactTask.java | 1 - .../sort/CombiningUnilateralSortMerger.java | 16 +- .../operators/sort/FixedLengthRecordSorter.java | 2 +- .../runtime/operators/sort/InMemorySorter.java | 2 +- .../runtime/operators/sort/MergeIterator.java | 17 +- .../operators/sort/MergeMatchIterator.java | 1 - .../sort/SortMergeCoGroupIterator.java | 20 +- .../operators/util/CoGroupTaskIterator.java | 9 +- .../flink/runtime/util/EmptyIterator.java | 11 +- .../flink/runtime/util/KeyGroupedIterator.java | 23 +- .../util/MutableToRegularIteratorWrapper.java | 16 +- .../runtime/util/SingleElementIterator.java | 7 +- .../runtime/operators/CachedMatchTaskTest.java | 18 +- .../operators/CoGroupTaskExternalITCase.java | 19 +- .../runtime/operators/CoGroupTaskTest.java | 30 +-- .../runtime/operators/CombineTaskTest.java | 35 ++- .../flink/runtime/operators/MapTaskTest.java | 1 + .../operators/ReduceTaskExternalITCase.java | 35 ++- .../flink/runtime/operators/ReduceTaskTest.java | 50 ++-- .../operators/chaining/ChainTaskTest.java | 15 +- .../drivers/AllGroupReduceDriverTest.java | 16 +- .../drivers/GroupReduceDriverTest.java | 16 +- .../CombiningUnilateralSortMergerITCase.java | 24 +- .../sort/SortMergeCoGroupIteratorITCase.java | 4 +- .../runtime/util/KeyGroupedIteratorTest.java | 43 +++- .../api/scala/functions/ReduceFunction.scala | 4 +- .../api/scala/operators/CoGroupOperator.scala | 7 +- .../api/scala/operators/ReduceOperator.scala | 5 +- .../test/accumulators/AccumulatorITCase.java | 1 + .../AccumulatorIterativeITCase.java | 8 +- .../KMeansIterativeNepheleITCase.java | 3 +- .../BulkIterationWithAllReducerITCase.java | 8 +- .../CoGroupConnectedComponentsITCase.java | 4 +- .../CoGroupConnectedComponentsSecondITCase.java | 12 +- .../DependencyConnectedComponentsITCase.java | 30 +-- ...IterationTerminationWithTerminationTail.java | 1 - .../IterationTerminationWithTwoTails.java | 2 +- .../IterationWithAllReducerITCase.java | 1 - .../iterative/IterationWithChainingITCase.java | 118 +++++---- .../iterative/IterationWithUnionITCase.java | 81 ++---- ...nentsWithParametrizableAggregatorITCase.java | 35 ++- ...entsWithParametrizableConvergenceITCase.java | 20 +- .../ConnectedComponentsNepheleITCase.java | 5 +- .../IterationWithChainingNepheleITCase.java | 11 +- .../CustomCompensatableDotProductCoGroup.java | 11 +- .../CustomCompensatingMap.java | 1 + .../CustomRankCombiner.java | 6 +- .../CompensatableDotProductCoGroup.java | 4 +- .../test/javaApiOperators/CoGroupITCase.java | 94 +++---- .../javaApiOperators/GroupReduceITCase.java | 123 ++++------ .../flink/test/operators/CoGroupITCase.java | 29 +-- .../flink/test/operators/ReduceITCase.java | 23 +- .../recordJobTests/GroupOrderReduceITCase.java | 1 - .../graph/WorksetConnectedComponents.java | 2 +- 103 files changed, 1295 insertions(+), 820 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java index ea9edff..89db1fa 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java @@ -194,15 +194,14 @@ public class AvroWithEmptyArrayITCase extends RecordAPITestBase { private static final long serialVersionUID = 1L; @Override - public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) - throws Exception { + public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) { Record r1 = null; - if (records1.hasNext()) { + while (records1.hasNext()) { r1 = records1.next(); } Record r2 = null; - if (records2.hasNext()) { + while (records2.hasNext()) { r2 = records2.next(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java index 25caf0c..88b1892 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.hadoopcompatibility.mapred.record.example; import java.io.Serializable; @@ -103,6 +102,7 @@ public class WordCount implements Program, ProgramDescription { public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { Record element = null; int sum = 0; + while (records.hasNext()) { element = records.next(); int cnt = element.getField(1, IntValue.class).getValue(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java index a3cd3d5..8aaf8e5 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.hadoopcompatibility.mapred.record.example; import java.io.Serializable; @@ -101,6 +100,7 @@ public class WordCountWithOutputFormat implements Program, ProgramDescription { public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { Record element = null; int sum = 0; + while (records.hasNext()) { element = records.next(); int cnt = element.getField(1, IntValue.class).getValue(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java index 65be2f8..8f9149c 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java @@ -413,24 +413,28 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver } @Override - public void coGroup(Iterator<Tuple2<VertexKey, Message>> messages, Iterator<Tuple2<VertexKey, VertexValue>> vertex, + public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Tuple2<VertexKey, VertexValue>> vertex, Collector<Tuple2<VertexKey, VertexValue>> out) throws Exception { - if (vertex.hasNext()) { - Tuple2<VertexKey, VertexValue> vertexState = vertex.next(); + final Iterator<Tuple2<VertexKey, VertexValue>> vertexIter = vertex.iterator(); + + if (vertexIter.hasNext()) { + Tuple2<VertexKey, VertexValue> vertexState = vertexIter.next(); @SuppressWarnings("unchecked") - Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages; + Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator(); messageIter.setSource(downcastIter); vertexUpdateFunction.setOutput(vertexState, out); vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter); - } else { - if (messages.hasNext()) { + } + else { + final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator(); + if (messageIter.hasNext()) { String message = "Target vertex does not exist!."; try { - Tuple2<VertexKey, Message> next = messages.next(); + Tuple2<VertexKey, Message> next = messageIter.next(); message = "Target vertex '" + next.f0 + "' does not exist!."; } catch (Throwable t) {} throw new Exception(message); @@ -481,13 +485,15 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver } @Override - public void coGroup(Iterator<Tuple2<VertexKey, VertexKey>> edges, - Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out) + public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> edges, + Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out) throws Exception { - if (state.hasNext()) { - Tuple2<VertexKey, VertexValue> newVertexState = state.next(); - messagingFunction.set((Iterator<?>) edges, out); + final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator(); + + if (stateIter.hasNext()) { + Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next(); + messagingFunction.set((Iterator<?>) edges.iterator(), out); messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1); } } @@ -534,13 +540,15 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver } @Override - public void coGroup(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> edges, - Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out) + public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, EdgeValue>> edges, + Iterable<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out) throws Exception { - if (state.hasNext()) { - Tuple2<VertexKey, VertexValue> newVertexState = state.next(); - messagingFunction.set((Iterator<?>) edges, out); + final Iterator<Tuple2<VertexKey, VertexValue>> stateIter = state.iterator(); + + if (stateIter.hasNext()) { + Tuple2<VertexKey, VertexValue> newVertexState = stateIter.next(); + messagingFunction.set((Iterator<?>) edges.iterator(), out); messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java index 3a58afc..96bc799 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java @@ -161,7 +161,7 @@ public class SpargelIteration { @Override public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception { - + if (vertex.hasNext()) { Record first = vertex.next(); first.getFieldInto(0, vertexKey); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java index 3624d86..2a4d6d8 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java @@ -16,11 +16,8 @@ * limitations under the License. */ - package org.apache.flink.compiler; -import java.util.Iterator; - import org.apache.flink.api.java.functions.RichCoGroupFunction; import org.apache.flink.api.java.functions.RichMapFunction; import org.junit.Assert; @@ -30,6 +27,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DeltaIteration; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.plan.Channel; import org.apache.flink.compiler.plan.DualInputPlanNode; import org.apache.flink.compiler.plan.OptimizedPlan; @@ -44,8 +42,7 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase { public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> { @Override - public void coGroup(Iterator<Tuple1<Integer>> first, Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws Exception { - } + public void coGroup(Iterable<Tuple1<Integer>> first, Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {} } public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java index b9cc769..9f63683 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java @@ -16,11 +16,8 @@ * limitations under the License. */ - package org.apache.flink.compiler; -import java.util.Iterator; - import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.functions.RichGroupReduceFunction; @@ -51,7 +48,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source"); data.reduceGroup(new RichGroupReduceFunction<Double, Double>() { - public void reduce(Iterator<Double> values, Collector<Double> out) {} + public void reduce(Iterable<Double> values, Collector<Double> out) {} }).name("reducer") .print().name("sink"); @@ -95,7 +92,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java DataSet<Long> data = env.generateSequence(1, 8000000).name("source"); GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() { - public void reduce(Iterator<Long> values, Collector<Long> out) {} + public void reduce(Iterable<Long> values, Collector<Long> out) {} }).name("reducer"); reduced.setCombinable(true); @@ -148,7 +145,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java data .groupBy(1) .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { - public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} + public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} }).name("reducer") .print().name("sink"); @@ -197,7 +194,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data .groupBy(1) .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { - public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} + public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} }).name("reducer"); reduced.setCombinable(true); @@ -256,7 +253,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java public String getKey(Tuple2<String, Double> value) { return value.f0; } }) .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { - public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} + public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} }).name("reducer") .print().name("sink"); @@ -314,7 +311,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java public String getKey(Tuple2<String, Double> value) { return value.f0; } }) .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { - public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} + public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} }).name("reducer"); reduced.setCombinable(true); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java index 8fc4324..828a635 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.compiler; import static org.junit.Assert.*; @@ -24,8 +23,6 @@ import static org.junit.Assert.*; import org.apache.flink.api.java.functions.RichFlatMapFunction; import org.junit.Test; -import java.util.Iterator; - import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DeltaIteration; @@ -287,7 +284,7 @@ public class IterationsCompilerTest extends CompilerTestBase { public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> { @Override - public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {} + public void reduce(Iterable<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {} } @ConstantFields("0") http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java index 64a4791..62d5d9b 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java @@ -16,17 +16,15 @@ * limitations under the License. */ - package org.apache.flink.compiler; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.util.Iterator; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DeltaIteration; @@ -214,7 +212,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { iter.getWorkset().join(invariantInput) .where(1, 2) .equalTo(1, 2) - .with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() { + .with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() { public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) { return first; } @@ -224,7 +222,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { result.join(iter.getSolutionSet()) .where(1, 0) .equalTo(0, 2) - .with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() { + .with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) { return second; } @@ -283,7 +281,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2) .reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() { - public void reduce(Iterator<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {} + public void reduce(Iterable<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {} }) .name(NEXT_WORKSET_REDUCER_NAME) .withConstantSet("1->1","2->2","0->0"); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java index 42275af..e06e3ba 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java @@ -16,10 +16,8 @@ * limitations under the License. */ - package org.apache.flink.compiler.testfunctions; -import java.util.Iterator; import org.apache.flink.api.java.functions.RichGroupReduceFunction; import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable; @@ -32,9 +30,9 @@ public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> { private static final long serialVersionUID = 1L; @Override - public void reduce(Iterator<T> values, Collector<T> out) { - while (values.hasNext()) { - out.collect(values.next()); + public void reduce(Iterable<T> values, Collector<T> out) { + for (T next : values) { + out.collect(next); } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java index 3f24e65..b1a0e2d 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java @@ -16,11 +16,8 @@ * limitations under the License. */ - package org.apache.flink.compiler.testfunctions; -import java.util.Iterator; - import org.apache.flink.api.java.functions.RichGroupReduceFunction; import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.util.Collector; @@ -32,7 +29,7 @@ public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> { private static final long serialVersionUID = 1L; @Override - public void reduce(Iterator<T> values, Collector<T> out) { - out.collect(values.next()); + public void reduce(Iterable<T> values, Collector<T> out) { + out.collect(values.iterator().next()); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java index 6ef1651..13cd37b 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCoGroupStub.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.compiler.util; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java index 3f32423..b78a850 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/IdentityReduce.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.compiler.util; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java index cc8fe78..1a0abeb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java @@ -19,7 +19,6 @@ package org.apache.flink.api.common.functions; import java.io.Serializable; -import java.util.Iterator; import org.apache.flink.util.Collector; @@ -41,11 +40,11 @@ import org.apache.flink.util.Collector; * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked * with in empty input for the side of the data set that did not contain elements with that specific key. * - * @param <V1> The data type of the first input data set. - * @param <V2> The data type of the second input data set. + * @param <IN1> The data type of the first input data set. + * @param <IN2> The data type of the second input data set. * @param <O> The data type of the returned elements. */ -public interface CoGroupFunction<V1, V2, O> extends Function, Serializable { +public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable { /** * This method must be implemented to provide a user implementation of a @@ -59,5 +58,5 @@ public interface CoGroupFunction<V1, V2, O> extends Function, Serializable { * @throws Exception The function may throw Exceptions, which will cause the program to cancel, * and may trigger the recovery logic. */ - void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) throws Exception; + public void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java index aed7fae..0588526 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java @@ -19,7 +19,6 @@ package org.apache.flink.api.common.functions; import java.io.Serializable; -import java.util.Iterator; /** * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction} @@ -45,5 +44,5 @@ public interface CombineFunction<T> extends Function, Serializable { * @throws Exception The function may throw Exceptions, which will cause the program to cancel, * and may trigger the recovery logic. */ - T combine(Iterator<T> values) throws Exception; + T combine(Iterable<T> values) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java index edf1614..bbcbd0a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java @@ -19,7 +19,6 @@ package org.apache.flink.api.common.functions; import java.io.Serializable; -import java.util.Iterator; import org.apache.flink.util.Collector; @@ -47,5 +46,5 @@ public interface FlatCombineFunction<T> extends Function, Serializable { * @throws Exception The function may throw Exceptions, which will cause the program to cancel, * and may trigger the recovery logic. */ - void combine(Iterator<T> values, Collector<T> out) throws Exception; + void combine(Iterable<T> values, Collector<T> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java index 5fa959c..54cc5c7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java @@ -19,7 +19,6 @@ package org.apache.flink.api.common.functions; import java.io.Serializable; -import java.util.Iterator; import org.apache.flink.util.Collector; @@ -53,5 +52,5 @@ public interface GroupReduceFunction<T, O> extends Function, Serializable { * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ - void reduce(Iterator<T> values, Collector<O> out) throws Exception; + void reduce(Iterable<T> values, Collector<O> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java new file mode 100644 index 0000000..a15c31c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/TraversableOnceException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +public class TraversableOnceException extends RuntimeException { + + private static final long serialVersionUID = 7636881584773577290L; + + public TraversableOnceException() { + super("The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java index 2d794bd..3d68e99 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java @@ -35,7 +35,7 @@ import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Edge; import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Triad; /** - * Triangle enumeration is a preprocessing step to find closely connected parts in graphs. + * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. * A triangle consists of three edges that connect three vertices with each other. * * <p> @@ -154,7 +154,9 @@ public class EnumTrianglesBasic { private final Triad outTriad = new Triad(); @Override - public void reduce(Iterator<Edge> edges, Collector<Triad> out) throws Exception { + public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception { + + final Iterator<Edge> edges = edgesIter.iterator(); // clear vertex list vertices.clear(); @@ -165,11 +167,11 @@ public class EnumTrianglesBasic { vertices.add(firstEdge.getSecondVertex()); // build and emit triads - while(edges.hasNext()) { + while (edges.hasNext()) { Integer higherVertexId = edges.next().getSecondVertex(); // combine vertex with all previously read vertices - for(Integer lowerVertexId : vertices) { + for (Integer lowerVertexId : vertices) { outTriad.setSecondVertex(lowerVertexId); outTriad.setThirdVertex(higherVertexId); out.collect(outTriad); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java index c0ea26a..efccb59 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java @@ -38,7 +38,7 @@ import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.EdgeWithD import org.apache.flink.example.java.graph.util.EnumTrianglesDataTypes.Triad; /** - * Triangle enumeration is a preprocessing step to find closely connected parts in graphs. + * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. * A triangle consists of three edges that connect three vertices with each other. * * <p> @@ -166,8 +166,9 @@ public class EnumTrianglesOpt { final EdgeWithDegrees outputEdge = new EdgeWithDegrees(); @Override - public void reduce(Iterator<Edge> edges, Collector<EdgeWithDegrees> out) throws Exception { + public void reduce(Iterable<Edge> edgesIter, Collector<EdgeWithDegrees> out) { + Iterator<Edge> edges = edgesIter.iterator(); otherVertices.clear(); // get first edge @@ -176,7 +177,7 @@ public class EnumTrianglesOpt { this.otherVertices.add(edge.getSecondVertex()); // get all other edges (assumes edges are sorted by second vertex) - while(edges.hasNext()) { + while (edges.hasNext()) { edge = edges.next(); Integer otherVertex = edge.getSecondVertex(); // collect unique vertices @@ -274,7 +275,8 @@ public class EnumTrianglesOpt { private final Triad outTriad = new Triad(); @Override - public void reduce(Iterator<Edge> edges, Collector<Triad> out) throws Exception { + public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception { + final Iterator<Edge> edges = edgesIter.iterator(); // clear vertex list vertices.clear(); @@ -285,7 +287,7 @@ public class EnumTrianglesOpt { vertices.add(firstEdge.getSecondVertex()); // build and emit triads - while(edges.hasNext()) { + while (edges.hasNext()) { Integer higherVertexId = edges.next().getSecondVertex(); // combine vertex with all previously read vertices http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java index ba9754f..e6a9272 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java @@ -21,7 +21,6 @@ package org.apache.flink.example.java.graph; import static org.apache.flink.api.java.aggregation.Aggregations.SUM; import java.util.ArrayList; -import java.util.Iterator; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -163,12 +162,11 @@ public class PageRankBasic { private final ArrayList<Long> neighbors = new ArrayList<Long>(); @Override - public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) { + public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) { neighbors.clear(); Long id = 0L; - while (values.hasNext()) { - Tuple2<Long, Long> n = values.next(); + for (Tuple2<Long, Long> n : values) { id = n.f0; neighbors.add(n.f1); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java index 22054da..e9ba406 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java @@ -25,8 +25,6 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.IterativeDataSet; -import java.util.Iterator; - import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.example.java.graph.util.ConnectedComponentsData; @@ -70,8 +68,8 @@ public class TransitiveClosureNaive implements ProgramDescription { .groupBy(0, 1) .reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { @Override - public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception { - out.collect(values.next()); + public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(values.iterator().next()); } }); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java index 3033c0d..9ca6ea9 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java @@ -16,10 +16,8 @@ * limitations under the License. */ - package org.apache.flink.example.java.relational; -import java.util.Iterator; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.FilterFunction; @@ -249,12 +247,12 @@ public class WebLogAnalysis { * 2: AVG_DURATION */ @Override - public void coGroup(Iterator<Tuple3<Integer, String, Integer>> ranks, Iterator<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) { + public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) { // Check if there is a entry in the visits relation - if (!visits.hasNext()) { - while (ranks.hasNext()) { + if (!visits.iterator().hasNext()) { + for (Tuple3<Integer, String, Integer> next : ranks) { // Emit all rank pairs - out.collect(ranks.next()); + out.collect(next); } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index af0ea52..bcc523d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; +import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.java.io.CollectionInputFormat; import org.apache.flink.api.java.io.CsvReader; @@ -253,6 +254,26 @@ public abstract class ExecutionEnvironment { public CsvReader readCsvFile(String filePath) { return new CsvReader(filePath, this); } + + // ------------------------------------ File Input Format ----------------------------------------- + + public <X> DataSource<X> readFile(FileInputFormat<X> inputFormat, String filePath) { + if (inputFormat == null) { + throw new IllegalArgumentException("InputFormat must not be null."); + } + if (filePath == null) { + throw new IllegalArgumentException("The file path must not be null."); + } + + inputFormat.setFilePath(new Path(filePath)); + try { + return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat)); + } + catch (Exception e) { + throw new InvalidProgramException("The type returned by the input format could not be automatically determined. " + + "Please specify the TypeInformation of the produced type explicitly."); + } + } // ----------------------------------- Generic Input Format --------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java index b363606..087808d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java @@ -28,13 +28,13 @@ public abstract class GroupReduceIterator<IN, OUT> extends RichGroupReduceFuncti private static final long serialVersionUID = 1L; - public abstract Iterator<OUT> reduceGroup(Iterator<IN> values) throws Exception; + public abstract Iterator<OUT> reduceGroup(Iterable<IN> values) throws Exception; // ------------------------------------------------------------------------------------------- @Override - public final void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception { + public final void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception { for (Iterator<OUT> iter = reduceGroup(values); iter.hasNext(); ) { out.collect(iter.next()); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java index e78c31e..3169622 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java @@ -18,8 +18,6 @@ package org.apache.flink.api.java.functions; -import java.util.Iterator; - import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.RichFunction; @@ -40,6 +38,6 @@ public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFun private static final long serialVersionUID = 1L; @Override - public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, Collector<OUT> out) throws Exception; + public abstract void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java index 97c15ff..a5d45aa 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java @@ -24,8 +24,6 @@ import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.util.Collector; -import java.util.Iterator; - /** * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: @@ -39,5 +37,5 @@ public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction im private static final long serialVersionUID = 1L; @Override - public abstract void combine(Iterator<T> values, Collector<T> out) throws Exception; + public abstract void combine(Iterable<T> values, Collector<T> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java index 801f592..9198aeb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java @@ -22,7 +22,6 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import java.util.Iterator; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.FlatCombineFunction; @@ -44,7 +43,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct private static final long serialVersionUID = 1L; @Override - public abstract void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception; + public abstract void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception; /** * The combine methods pre-reduces elements. It may be called on subsets of the data @@ -59,7 +58,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct * <p> * Since the reduce function will be called on the result of this method, it is important that this * method returns the same data type as it consumes. By default, this method only calls the - * {@link #reduce(Iterator, Collector)} method. If the behavior in the pre-reducing is different + * {@link #reduce(Iterable, Collector)} method. If the behavior in the pre-reducing is different * from the final reduce function (for example because the reduce function changes the data type), * this method must be overwritten, or the execution will fail. * @@ -70,7 +69,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct * to fail and may trigger recovery. */ @Override - public void combine(Iterator<IN> values, Collector<IN> out) throws Exception { + public void combine(Iterable<IN> values, Collector<IN> out) throws Exception { @SuppressWarnings("unchecked") Collector<OUT> c = (Collector<OUT>) out; reduce(values, c); @@ -80,7 +79,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct /** * This annotation can be added to classes that extend {@link RichGroupReduceFunction}, in oder to mark - * them as "combinable". The system may call the {@link RichGroupReduceFunction#combine(Iterator, Collector)} + * them as "combinable". The system may call the {@link RichGroupReduceFunction#combine(Iterable, Collector)} * method on such functions, to pre-reduce the data before transferring it over the network to * the actual group reduce operation. * <p> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java index 80a5fa0..1ceb8c8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java @@ -271,13 +271,14 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate } @Override - public void reduce(Iterator<T> values, Collector<T> out) { + public void reduce(Iterable<T> records, Collector<T> out) { final AggregationFunction<Object>[] aggFunctions = this.aggFunctions; final int[] fieldPositions = this.fieldPositions; // aggregators are initialized from before T current = null; + final Iterator<T> values = records.iterator(); while (values.hasNext()) { current = values.next(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java index 744893b..a890b32 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java @@ -301,7 +301,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU /** * Intermediate step of a CoGroup transformation. <br/> * To continue the CoGroup transformation, select the grouping key of the first input {@link DataSet} by calling - * {@link CoGroupOperatorSets#where()} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(KeySelector)}. + * {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(int, int...)} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets#where(KeySelector)}. * * @param <I1> The type of the first input DataSet of the CoGroup transformation. * @param <I2> The type of the second input DataSet of the CoGroup transformation. @@ -328,7 +328,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @param field0 The first index of the Tuple fields of the first co-grouped DataSets that should be used as key * @param fields The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys. * @return An incomplete CoGroup transformation. - * Call {@link CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup. + * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} to continue the CoGroup. * * @see Tuple * @see DataSet @@ -348,7 +348,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * @param field0 The first field of the Tuple fields of the first co-grouped DataSets that should be used as key * @param fields The fields of the first co-grouped DataSets that should be used as keys. * @return An incomplete CoGroup transformation. - * Call {@link CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup. + * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} to continue the CoGroup. * * @see Tuple * @see DataSet @@ -367,7 +367,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU * * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped. * @return An incomplete CoGroup transformation. - * Call {@link CoGroupOperatorSetsPredicate#equalTo()} to continue the CoGroup. + * Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} to continue the CoGroup. * * @see KeySelector * @see DataSet @@ -381,7 +381,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU /** * Intermediate step of a CoGroup transformation. <br/> * To continue the CoGroup transformation, select the grouping key of the second input {@link DataSet} by calling - * {@link CoGroupOperatorSetsPredicate#equalTo(int...)} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(KeySelector)}. + * {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(int, int...)} or {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate#equalTo(KeySelector)}. * */ public final class CoGroupOperatorSetsPredicate { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index 7646fa0..fd35773 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -18,8 +18,6 @@ package org.apache.flink.api.java.operators; -import java.util.Iterator; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -148,8 +146,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera private static final long serialVersionUID = 1L; @Override - public void reduce(Iterator<T> values, Collector<T> out) { - out.collect(values.next()); + public void reduce(Iterable<T> values, Collector<T> out) { + out.collect(values.iterator().next()); } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index e1424ad..7b0ad4d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -37,7 +37,6 @@ import org.apache.flink.types.TypeInformation; import org.apache.flink.api.java.DataSet; - /** * This operator represents the application of a "reduceGroup" function on a data set, and the * result data set produced by the function. @@ -185,7 +184,6 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT else { throw new UnsupportedOperationException("Unrecognized key type."); } - } @@ -215,7 +213,4 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT return reducer; } - - - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java index 89290f0..36bae6b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java @@ -18,8 +18,6 @@ package org.apache.flink.api.java.operators.translation; -import java.util.Iterator; - import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; @@ -78,47 +76,11 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> @Override - public void coGroup(Iterator<Tuple2<K, I1>> records1, Iterator<Tuple2<K, I2>> records2, Collector<OUT> out) throws Exception { - iter1.set(records1); - iter2.set(records2); + public void coGroup(Iterable<Tuple2<K, I1>> records1, Iterable<Tuple2<K, I2>> records2, Collector<OUT> out) throws Exception { + iter1.set(records1.iterator()); + iter2.set(records2.iterator()); this.wrappedFunction.coGroup(iter1, iter2, out); } } - - public static class UnwrappingKeyIterator<K, I1> implements Iterator<I1> { - - private Iterator<Tuple2<K, I1>> outerIterator; - I1 firstValue; - - public UnwrappingKeyIterator(Iterator<Tuple2<K, I1>> records1) { - this.outerIterator = records1; - this.firstValue = null; - } - - public UnwrappingKeyIterator(Iterator<Tuple2<K, I1>> records1, I1 firstValue ) { - this.outerIterator = records1; - this.firstValue = firstValue; - } - - @Override - public boolean hasNext() { - return firstValue != null || outerIterator.hasNext(); - } - - @Override - public I1 next() { - if(firstValue != null) { - firstValue = null; - return firstValue; - } - return outerIterator.next().getField(1); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java index 73ea004..efd52d5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java @@ -68,18 +68,10 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K> super(wrapped); } - //@SuppressWarnings("unchecked") - //@Override - //public OUT join(Tuple2<K, I1> value1, Tuple2<K, I2> value2) throws Exception { - // return wrappedFunction.join((I1)(value1.getField(1)), (I2)(value2.getField(1))); - //} - @SuppressWarnings("unchecked") @Override public void join (Tuple2<K, I1> value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception { wrappedFunction.join ((I1)(value1.getField(1)), (I2)(value2.getField(1)), collector); } - } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java index 29eb5ed..7e3f0e5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java @@ -18,8 +18,6 @@ package org.apache.flink.api.java.operators.translation; -import java.util.Iterator; - import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; @@ -65,14 +63,14 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp @Override - public void reduce(Iterator<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception { - iter.set(values); + public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception { + iter.set(values.iterator()); this.wrappedFunction.reduce(iter, out); } @Override - public void combine(Iterator<Tuple2<K, IN>> values, Collector<Tuple2<K, IN>> out) throws Exception { - iter.set(values); + public void combine(Iterable<Tuple2<K, IN>> values, Collector<Tuple2<K, IN>> out) throws Exception { + iter.set(values.iterator()); coll.set(out); this.wrappedFunction.combine(iter, coll); } @@ -98,8 +96,8 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp @Override - public void reduce(Iterator<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception { - iter.set(values); + public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception { + iter.set(values.iterator()); this.wrappedFunction.reduce(iter, out); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java index 1f2c208..c09f3a8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java @@ -21,20 +21,23 @@ package org.apache.flink.api.java.operators.translation; import java.util.Iterator; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.TraversableOnceException; /** * An iterator that reads 2-tuples (key value pairs) and returns only the values (second field). * The iterator also tracks the keys, as the pairs flow though it. */ -public class TupleUnwrappingIterator<T, K> implements Iterator<T>, java.io.Serializable { +public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>, java.io.Serializable { private static final long serialVersionUID = 1L; private K lastKey; private Iterator<Tuple2<K, T>> iterator; + private boolean iteratorAvailable; public void set(Iterator<Tuple2<K, T>> iterator) { this.iterator = iterator; + this.iteratorAvailable = true; } public K getLastKey() { @@ -57,4 +60,14 @@ public class TupleUnwrappingIterator<T, K> implements Iterator<T>, java.io.Seria public void remove() { throw new UnsupportedOperationException(); } + + @Override + public Iterator<T> iterator() { + if (iteratorAvailable) { + iteratorAvailable = false; + return this; + } else { + throw new TraversableOnceException(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java index 267d879..748e3f3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java @@ -42,7 +42,7 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF private static final long serialVersionUID = 1L; - protected final T wrappedFunction; + protected T wrappedFunction; protected WrappingFunction(T wrappedFunction) { this.wrappedFunction = wrappedFunction; @@ -174,6 +174,5 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF public <T extends Value> T getPreviousIterationAggregate(String name) { return ((IterationRuntimeContext) context).<T>getPreviousIterationAggregate(name); } - } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java index e9b5c25..fc7ef49 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java @@ -28,7 +28,7 @@ import org.apache.flink.util.Collector; /** * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CoGroupOperator}. */ -public abstract class CoGroupFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> { +public abstract class CoGroupFunction extends AbstractRichFunction { private static final long serialVersionUID = 1L; @@ -45,7 +45,5 @@ public abstract class CoGroupFunction extends AbstractRichFunction implements or * runtime catches an exception, it aborts the task and lets the fail-over logic * decide whether to retry the task execution. */ - @Override public abstract void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception; - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java index a1e6369..9848499 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java @@ -16,14 +16,11 @@ * limitations under the License. */ - package org.apache.flink.api.java.record.functions; import java.util.Iterator; import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.functions.FlatCombineFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; @@ -31,7 +28,7 @@ import org.apache.flink.util.Collector; * The ReduceFunction must be extended to provide a reducer implementation, as invoked by a * {@link org.apache.flink.api.java.record.operators.ReduceOperator}. */ -public abstract class ReduceFunction extends AbstractRichFunction implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> { +public abstract class ReduceFunction extends AbstractRichFunction { private static final long serialVersionUID = 1L; @@ -47,7 +44,6 @@ public abstract class ReduceFunction extends AbstractRichFunction implements Gro * runtime catches an exception, it aborts the reduce task and lets the fail-over logic * decide whether to retry the reduce execution. */ - @Override public abstract void reduce(Iterator<Record> records, Collector<Record> out) throws Exception; /** @@ -71,7 +67,6 @@ public abstract class ReduceFunction extends AbstractRichFunction implements Gro * runtime catches an exception, it aborts the combine task and lets the fail-over logic * decide whether to retry the combiner execution. */ - @Override public void combine(Iterator<Record> records, Collector<Record> out) throws Exception { // to be implemented, if the reducer should use a combiner. Note that the combining method // is only used, if the stub class is further annotated with the annotation http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java index a8cedeb..c958b84 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java @@ -16,9 +16,11 @@ * limitations under the License. */ - package org.apache.flink.api.java.record.operators; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -29,13 +31,15 @@ import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.RecordOperator; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; -import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.api.java.record.functions.CoGroupFunction; import org.apache.flink.api.java.record.functions.FunctionAnnotation; import org.apache.flink.types.Key; import org.apache.flink.types.Record; +import org.apache.flink.util.Collector; +import org.apache.flink.util.InstantiationUtil; /** * CoGroupOperator that applies a {@link CoGroupFunction} to groups of records sharing @@ -43,7 +47,7 @@ import org.apache.flink.types.Record; * * @see CoGroupFunction */ -public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, CoGroupFunction> implements RecordOperator { +public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> implements RecordOperator { /** * The types of the keys that the operator groups on. @@ -61,7 +65,8 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, * @param keyColumn2 The position of the key in the second input's records. */ public static Builder builder(CoGroupFunction udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) { - return new Builder(new UserCodeObjectWrapper<CoGroupFunction>(udf), keyClass, keyColumn1, keyColumn2); + WrappingCoGroupFunction wrapper = new WrappingCoGroupFunction(udf); + return new Builder(new UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2); } /** @@ -75,7 +80,8 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, public static Builder builder(Class<? extends CoGroupFunction> udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) { - return new Builder(new UserCodeClassWrapper<CoGroupFunction>(udf), keyClass, keyColumn1, keyColumn2); + WrappingCoGroupFunction wrapper = new WrappingClassCoGroupFunction(udf); + return new Builder(new UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2); } /** @@ -96,7 +102,9 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, setBroadcastVariables(builder.broadcastInputs); setGroupOrderForInputOne(builder.secondaryOrder1); setGroupOrderForInputTwo(builder.secondaryOrder2); - setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf)); + + CoGroupFunction function = ((WrappingCoGroupFunction) builder.udf.getUserCodeObject()).getWrappedFunction(); + setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(new UserCodeObjectWrapper<CoGroupFunction>(function))); } // -------------------------------------------------------------------------------------------- @@ -115,7 +123,7 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, public static class Builder { /* The required parameters */ - private final UserCodeWrapper<CoGroupFunction> udf; + private final UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf; private final List<Class<? extends Key<?>>> keyClasses; private final List<Integer> keyColumns1; private final List<Integer> keyColumns2; @@ -136,7 +144,7 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, * @param keyColumn1 The position of the key in the first input's records. * @param keyColumn2 The position of the key in the second input's records. */ - protected Builder(UserCodeWrapper<CoGroupFunction> udf, Class<? extends Key<?>> keyClass, + protected Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) { this.udf = udf; @@ -157,7 +165,7 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, * * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator. */ - protected Builder(UserCodeWrapper<CoGroupFunction> udf) { + protected Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf) { this.udf = udf; this.keyClasses = new ArrayList<Class<? extends Key<?>>>(); this.keyColumns1 = new ArrayList<Integer>(); @@ -338,4 +346,39 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, return new CoGroupOperator(this); } } + + // ============================================================================================ + + public static class WrappingCoGroupFunction extends WrappingFunction<CoGroupFunction> + implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> { + + private static final long serialVersionUID = 1L; + + public WrappingCoGroupFunction(CoGroupFunction coGrouper) { + super(coGrouper); + } + + @Override + public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) throws Exception { + this.wrappedFunction.coGroup(records1.iterator(), records2.iterator(), out); + } + } + + public static final class WrappingClassCoGroupFunction extends WrappingCoGroupFunction { + + private static final long serialVersionUID = 1L; + + public WrappingClassCoGroupFunction(Class<? extends CoGroupFunction> reducer) { + super(InstantiationUtil.instantiate(reducer)); + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeObject(wrappedFunction.getClass()); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + Class<?> clazz = (Class<?>) in.readObject(); + this.wrappedFunction = (CoGroupFunction) InstantiationUtil.instantiate(clazz); + } + } }
