http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java index a325e33..a29d4e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java @@ -16,10 +16,8 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators.drivers; -import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.functions.GroupReduceFunction; @@ -152,11 +150,10 @@ public class GroupReduceDriverTest { public static final class ConcatSumReducer extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { @Override - public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) { - Tuple2<String, Integer> current = values.next(); + public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) { + Tuple2<String, Integer> current = new Tuple2<String, Integer>("", 0); - while (values.hasNext()) { - Tuple2<String, Integer> next = values.next(); + for (Tuple2<String, Integer> next : values) { next.f0 = current.f0 + next.f0; next.f1 = current.f1 + next.f1; current = next; @@ -169,11 +166,10 @@ public class GroupReduceDriverTest { public static final class ConcatSumMutableReducer extends RichGroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> { @Override - public void reduce(Iterator<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) { - Tuple2<StringValue, IntValue> current = values.next(); + public void reduce(Iterable<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) { + Tuple2<StringValue, IntValue> current = new Tuple2<StringValue, IntValue>(new StringValue(""), new IntValue(0)); - while (values.hasNext()) { - Tuple2<StringValue, IntValue> next = values.next(); + for (Tuple2<StringValue, IntValue> next : values) { next.f0.append(current.f0); next.f1.setValue(current.f1.getValue() + next.f1.getValue()); current = next;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java index 7020f89..4b3c197 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators.sort; import java.io.IOException; @@ -26,13 +25,12 @@ import java.util.Iterator; import java.util.NoSuchElementException; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.api.java.record.functions.ReduceFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator; import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory; import org.apache.flink.configuration.Configuration; @@ -273,7 +271,7 @@ public class CombiningUnilateralSortMergerITCase { // -------------------------------------------------------------------------------------------- - public static class TestCountCombiner extends ReduceFunction { + public static class TestCountCombiner extends RichGroupReduceFunction<Record, Record> { private static final long serialVersionUID = 1L; private final IntValue count = new IntValue(); @@ -284,11 +282,11 @@ public class CombiningUnilateralSortMergerITCase { @Override - public void combine(Iterator<Record> values, Collector<Record> out) { + public void combine(Iterable<Record> values, Collector<Record> out) { Record rec = null; int cnt = 0; - while (values.hasNext()) { - rec = values.next(); + for (Record next : values) { + rec = next; cnt += rec.getField(1, IntValue.class).getValue(); } @@ -298,7 +296,7 @@ public class CombiningUnilateralSortMergerITCase { } @Override - public void reduce(Iterator<Record> values, Collector<Record> out) {} + public void reduce(Iterable<Record> values, Collector<Record> out) {} @Override public void open(Configuration parameters) throws Exception { @@ -311,7 +309,7 @@ public class CombiningUnilateralSortMergerITCase { } } - public static class TestCountCombiner2 extends ReduceFunction { + public static class TestCountCombiner2 extends RichGroupReduceFunction<Record, Record> { private static final long serialVersionUID = 1L; public volatile boolean opened = false; @@ -319,11 +317,11 @@ public class CombiningUnilateralSortMergerITCase { public volatile boolean closed = false; @Override - public void combine(Iterator<Record> values, Collector<Record> out) { + public void combine(Iterable<Record> values, Collector<Record> out) { Record rec = null; int cnt = 0; - while (values.hasNext()) { - rec = values.next(); + for (Record next : values) { + rec = next; cnt += Integer.parseInt(rec.getField(1, TestData.Value.class).toString()); } @@ -331,7 +329,7 @@ public class CombiningUnilateralSortMergerITCase { } @Override - public void reduce(Iterator<Record> values, Collector<Record> out) { + public void reduce(Iterable<Record> values, Collector<Record> out) { // yo, nothing, mon } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java index a66fb5d..48a4b91 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeCoGroupIteratorITCase.java @@ -116,8 +116,8 @@ public class SortMergeCoGroupIteratorITCase final TestData.Key key = new TestData.Key(); while (iterator.next()) { - Iterator<Record> iter1 = iterator.getValues1(); - Iterator<Record> iter2 = iterator.getValues2(); + Iterator<Record> iter1 = iterator.getValues1().iterator(); + Iterator<Record> iter2 = iterator.getValues2().iterator(); TestData.Value v1 = null; TestData.Value v2 = null; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java index e9010d2..27fa540 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java @@ -31,6 +31,7 @@ import org.apache.flink.types.IntValue; import org.apache.flink.types.Record; import org.apache.flink.types.StringValue; import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TraversableOnceException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -38,17 +39,15 @@ import org.junit.Test; /** * Test for the key grouped iterator, which advances in windows containing the same key and provides a sub-iterator * over the records with the same key. - * */ -public class KeyGroupedIteratorTest -{ +public class KeyGroupedIteratorTest { + private MutableObjectIterator<Record> sourceIter; // the iterator that provides the input private KeyGroupedIterator<Record> psi; // the grouping iterator, progressing in key steps @Before - public void setup() - { + public void setup() { final ArrayList<IntStringPair> source = new ArrayList<IntStringPair>(); // add elements to the source @@ -91,8 +90,7 @@ public class KeyGroupedIteratorTest } @Test - public void testNextKeyOnly() throws Exception - { + public void testNextKeyOnly() throws Exception { try { Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1)))); @@ -131,6 +129,8 @@ public class KeyGroupedIteratorTest try { // Key 1, Value A Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue(hasIterator(this.psi.getValues())); + Assert.assertFalse(hasIterator(this.psi.getValues())); Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1)))); Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue()); @@ -139,6 +139,8 @@ public class KeyGroupedIteratorTest // Key 2, Value B Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue(hasIterator(this.psi.getValues())); + Assert.assertFalse(hasIterator(this.psi.getValues())); Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2)))); Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue()); @@ -147,6 +149,8 @@ public class KeyGroupedIteratorTest // Key 3, Values C, D Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue(hasIterator(this.psi.getValues())); + Assert.assertFalse(hasIterator(this.psi.getValues())); Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3)))); Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue()); @@ -173,6 +177,8 @@ public class KeyGroupedIteratorTest // Key 4, Values E, F, G Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue(hasIterator(this.psi.getValues())); + Assert.assertFalse(hasIterator(this.psi.getValues())); Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4)))); Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue()); @@ -193,6 +199,8 @@ public class KeyGroupedIteratorTest // Key 5, Values H, I, J, K, L Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue(hasIterator(this.psi.getValues())); + Assert.assertFalse(hasIterator(this.psi.getValues())); Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); @@ -231,6 +239,7 @@ public class KeyGroupedIteratorTest Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey()); Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey()); + Assert.assertNull(this.psi.getValues()); } catch (Exception e) { e.printStackTrace(); Assert.fail("The test encountered an unexpected exception."); @@ -244,12 +253,18 @@ public class KeyGroupedIteratorTest // Progression only via nextKey() and hasNext() - Key 1, Value A Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue(hasIterator(this.psi.getValues())); + Assert.assertFalse(hasIterator(this.psi.getValues())); // Progression only through nextKey() - Key 2, Value B Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue(hasIterator(this.psi.getValues())); + Assert.assertFalse(hasIterator(this.psi.getValues())); // Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue(hasIterator(this.psi.getValues())); + Assert.assertFalse(hasIterator(this.psi.getValues())); Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3)))); Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue()); @@ -260,6 +275,8 @@ public class KeyGroupedIteratorTest // Progression first via next() only, then hasNext() only Key 4, Values E, F, G Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue(hasIterator(this.psi.getValues())); + Assert.assertFalse(hasIterator(this.psi.getValues())); Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class)); Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); @@ -270,6 +287,8 @@ public class KeyGroupedIteratorTest Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue(hasIterator(this.psi.getValues())); + Assert.assertFalse(hasIterator(this.psi.getValues())); Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); // end @@ -355,4 +374,14 @@ public class KeyGroupedIteratorTest return string; } } + + public boolean hasIterator(Iterable<?> iterable) { + try { + iterable.iterator(); + return true; + } + catch (TraversableOnceException e) { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala index 6ea8cbf..de7b7d1 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/ReduceFunction.scala @@ -19,10 +19,10 @@ package org.apache.flink.api.scala.functions -import java.util.{ Iterator => JIterator } - import scala.Iterator +import java.util.{Iterator => JIterator} + import org.apache.flink.api.scala.analysis.{UDTSerializer, FieldSelector, UDT} import org.apache.flink.api.scala.analysis.UDF1 http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala index 4c26307..6a71bb7 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CoGroupOperator.scala @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.scala.operators import language.experimental.macros @@ -54,7 +53,7 @@ class CoGroupDataSetWithWhereAndEqual[LeftIn, RightIn](val leftKeySelection: Lis def flatMap[Out](fun: (Iterator[LeftIn], Iterator[RightIn]) => Iterator[Out]): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro CoGroupMacros.flatMap[LeftIn, RightIn, Out] } -class NoKeyCoGroupBuilder(s: JCoGroupFunction) extends CoGroupOperator.Builder(new UserCodeObjectWrapper(s)) +class NoKeyCoGroupBuilder(s: JCoGroupFunction) extends CoGroupOperator.Builder(new UserCodeObjectWrapper(new CoGroupOperator.WrappingCoGroupFunction(s))) object CoGroupMacros { @@ -106,7 +105,9 @@ object CoGroupMacros { implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice + new CoGroupFunctionBase[LeftIn, RightIn, Out] { + override def coGroup(leftRecords: JIterator[Record], rightRecords: JIterator[Record], out: Collector[Record]) = { val firstLeftRecord = leftIterator.initialize(leftRecords) @@ -177,7 +178,9 @@ object CoGroupMacros { implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice + new CoGroupFunctionBase[LeftIn, RightIn, Out] { + override def coGroup(leftRecords: JIterator[Record], rightRecords: JIterator[Record], out: Collector[Record]) = { val firstLeftRecord = leftIterator.initialize(leftRecords) val firstRightRecord = rightIterator.initialize(rightRecords) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala index cf3a96c..f25b5a3 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.scala.operators import language.experimental.macros @@ -191,7 +190,9 @@ object ReduceMacros { implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice new ReduceFunctionBase[In, Out] { - override def reduce(records: JIterator[Record], out: Collector[Record]) = { + override def reduce(recordsIterable: JIterator[Record], out: Collector[Record]) = { + val records: JIterator[Record] = recordsIterable + if (records.hasNext) { val firstRecord = reduceIterator.initialize(records) reduceRecord.copyFrom(firstRecord, reduceForwardFrom, reduceForwardTo) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java index e76f86d..5497004 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java @@ -270,6 +270,7 @@ public class AccumulatorITCase extends RecordAPITestBase { private void reduceInternal(Iterator<Record> records, Collector<Record> out) { Record element = null; int sum = 0; + while (records.hasNext()) { element = records.next(); IntValue i = element.getField(1, IntValue.class); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java index bf6c37d..197a745 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java @@ -124,11 +124,13 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase { } @Override - public void reduce(Iterator<Record> it, Collector<Record> out) { + public void reduce(Iterator<Record> records, Collector<Record> out) { // Compute the sum int sum = 0; - while (it.hasNext()) { - Integer value = Integer.parseInt(it.next().getField(0, StringValue.class).getValue()); + + while (records.hasNext()) { + Record r = records.next(); + Integer value = Integer.parseInt(r.getField(0, StringValue.class).getValue()); sum += value; testCounter.add(value); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java index 4b3a4bf..2217679 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.api.java.record.io.CsvInputFormat; +import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction; import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory; import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory; import org.apache.flink.configuration.Configuration; @@ -252,7 +253,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase { tailConfig.setOutputSerializer(outputSerializer); // the udf - tailConfig.setStubWrapper(new UserCodeObjectWrapper<RecomputeClusterCenter>(new RecomputeClusterCenter())); + tailConfig.setStubWrapper(new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingReduceFunction(new RecomputeClusterCenter()))); return tail; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java index 35aea4b..b442b33 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java @@ -16,12 +16,10 @@ * limitations under the License. */ - package org.apache.flink.test.iterative; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; import org.apache.flink.api.java.functions.RichGroupReduceFunction; @@ -72,15 +70,13 @@ public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase { } @Override - public void reduce(Iterator<Integer> records, Collector<Integer> out) { + public void reduce(Iterable<Integer> records, Collector<Integer> out) { if (bcValue == null) { return; } final int x = bcValue; - while (records.hasNext()) { - int y = records.next(); - + for (Integer y : records) { if (y > x) { out.collect(y); return; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java index 37695b2..f2a43a8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative; import java.io.BufferedReader; @@ -104,7 +103,8 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase { long minimumComponentID = Long.MAX_VALUE; while (candidates.hasNext()) { - long candidateComponentID = candidates.next().getField(1, LongValue.class).getValue(); + Record candidate = candidates.next(); + long candidateComponentID = candidate.getField(1, LongValue.class).getValue(); if (candidateComponentID < minimumComponentID) { minimumComponentID = candidateComponentID; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java index 49bdb26..c6175b2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java @@ -16,11 +16,9 @@ * limitations under the License. */ - package org.apache.flink.test.iterative; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import org.apache.flink.api.java.functions.RichCoGroupFunction; @@ -116,17 +114,17 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase public static final class MinIdAndUpdate extends RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override - public void coGroup(Iterator<Tuple2<Long, Long>> candidates, Iterator<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) { - if (!current.hasNext()) { + public void coGroup(Iterable<Tuple2<Long, Long>> candidates, Iterable<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) { + if (!current.iterator().hasNext()) { throw new RuntimeException("Error: Id not encountered before."); } - Tuple2<Long, Long> old = current.next(); + Tuple2<Long, Long> old = current.iterator().next(); long minimumComponentID = Long.MAX_VALUE; - while (candidates.hasNext()) { - long candidateComponentID = candidates.next().f1; + for (Tuple2<Long, Long> candidate : candidates) { + long candidateComponentID = candidate.f1; if (candidateComponentID < minimumComponentID) { minimumComponentID = candidateComponentID; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java index d2669aa..fdfb321 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java @@ -16,11 +16,9 @@ * limitations under the License. */ - package org.apache.flink.test.iterative; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import org.apache.flink.api.java.functions.RichFlatMapFunction; @@ -170,8 +168,8 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; @Override - public void reduce(Iterator<Long> values, Collector<Long> out) throws Exception { - out.collect(values.next()); + public void reduce(Iterable<Long> values, Collector<Long> out) { + out.collect(values.iterator().next()); } } @@ -204,21 +202,19 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase { final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>(); @Override - public void reduce(Iterator<Tuple2<Long, Long>> values, - Collector<Tuple2<Long, Long>> out) throws Exception { + public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) { + Long vertexId = 0L; + Long minimumCompId = Long.MAX_VALUE; - final Tuple2<Long, Long> first = values.next(); - final Long vertexId = first.f0; - Long minimumCompId = first.f1; - - while ( values.hasNext() ) { - Long candidateCompId = values.next().f1; - if ( candidateCompId < minimumCompId ) { + for (Tuple2<Long, Long> value: values) { + vertexId = value.f0; + Long candidateCompId = value.f1; + if (candidateCompId < minimumCompId) { minimumCompId = candidateCompId; } } - resultVertex.setField(vertexId, 0); - resultVertex.setField(minimumCompId, 1); + resultVertex.f0 = vertexId; + resultVertex.f1 = minimumCompId; out.collect(resultVertex); } @@ -231,8 +227,8 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase { @Override public void flatMap( Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> vertexWithNewAndOldId, - Collector<Tuple2<Long, Long>> out) throws Exception { - + Collector<Tuple2<Long, Long>> out) + { if ( vertexWithNewAndOldId.f0.f1 < vertexWithNewAndOldId.f1.f1 ) { out.collect(vertexWithNewAndOldId.f0); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java index cb25019..3dc0bdf 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTerminationTail.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java index 6a487c4..2c777f5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative; import java.io.Serializable; @@ -110,6 +109,7 @@ public class IterationTerminationWithTwoTails extends RecordAPITestBase { public void reduce(Iterator<Record> it, Collector<Record> out) { // Compute the sum int sum = 0; + while (it.hasNext()) { sum += Integer.parseInt(it.next().getField(0, StringValue.class).getValue()) + 1; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java index c81b32a..b3145dc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java index b62d85a..d00602f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java @@ -44,85 +44,81 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class IterationWithChainingITCase extends RecordAPITestBase { - private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n"; + private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n"; - private String dataPath; - private String resultPath; + private String dataPath; + private String resultPath; - public IterationWithChainingITCase(Configuration config) { - super(config); + public IterationWithChainingITCase(Configuration config) { + super(config); setTaskManagerNumSlots(DOP); - } + } - @Override - protected void preSubmit() throws Exception { - dataPath = createTempFile("data_points.txt", DATA_POINTS); - resultPath = getTempFilePath("result"); - } + @Override + protected void preSubmit() throws Exception { + dataPath = createTempFile("data_points.txt", DATA_POINTS); + resultPath = getTempFilePath("result"); + } - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(DATA_POINTS, resultPath); - } + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(DATA_POINTS, resultPath); + } + @Override + protected Plan getTestJob() { + Plan plan = getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, resultPath); + return plan; + } - @Override - protected Plan getTestJob() { - Plan plan = getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, resultPath); - return plan; - } + @Parameters + public static Collection<Object[]> getConfigurations() { + Configuration config1 = new Configuration(); + config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP); + return toParameterList(config1); + } - @Parameters - public static Collection<Object[]> getConfigurations() { - Configuration config1 = new Configuration(); - config1.setInteger("ChainedMapperITCase#NoSubtasks", DOP); - return toParameterList(config1); - } + public static final class IdentityMapper extends MapFunction implements Serializable { - public static final class IdentityMapper extends MapFunction implements Serializable { + private static final long serialVersionUID = 1L; - private static final long serialVersionUID = 1L; + @Override + public void map(Record rec, Collector<Record> out) { + out.collect(rec); + } + } - @Override - public void map(Record rec, Collector<Record> out) { - out.collect(rec); - } - } + public static final class DummyReducer extends ReduceFunction implements Serializable { - public static final class DummyReducer extends ReduceFunction implements Serializable { + private static final long serialVersionUID = 1L; - private static final long serialVersionUID = 1L; + @Override + public void reduce(Iterator<Record> it, Collector<Record> out) { + while (it.hasNext()) { + out.collect(it.next()); + } + } + } - @Override - public void reduce(Iterator<Record> it, Collector<Record> out) { - while (it.hasNext()) { - out.collect(it.next()); - } - } - } + static Plan getTestPlan(int numSubTasks, String input, String output) { - static Plan getTestPlan(int numSubTasks, String input, String output) { + FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input"); + initialInput.setDegreeOfParallelism(1); - FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input"); - initialInput.setDegreeOfParallelism(1); + BulkIteration iteration = new BulkIteration("Loop"); + iteration.setInput(initialInput); + iteration.setMaximumNumberOfIterations(2); - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(initialInput); - iteration.setMaximumNumberOfIterations(2); + ReduceOperator dummyReduce = ReduceOperator.builder(new DummyReducer(), IntValue.class, 0).input(iteration.getPartialSolution()) + .name("Reduce something").build(); - ReduceOperator dummyReduce = ReduceOperator.builder(new DummyReducer(), IntValue.class, 0) - .input(iteration.getPartialSolution()) - .name("Reduce something") - .build(); + MapOperator dummyMap = MapOperator.builder(new IdentityMapper()).input(dummyReduce).build(); + iteration.setNextPartialSolution(dummyMap); + FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output"); - MapOperator dummyMap = MapOperator.builder(new IdentityMapper()).input(dummyReduce).build(); - iteration.setNextPartialSolution(dummyMap); - - FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output"); - - Plan plan = new Plan(finalResult, "Iteration with chained map test"); - plan.setDefaultParallelism(numSubTasks); - return plan; - } + Plan plan = new Plan(finalResult, "Iteration with chained map test"); + plan.setDefaultParallelism(numSubTasks); + return plan; + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java index aa83e9b..2a130ed 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java @@ -16,43 +16,28 @@ * limitations under the License. */ - package org.apache.flink.test.iterative; import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.operators.BulkIteration; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.IterativeDataSet; import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat; import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat; -import org.apache.flink.test.util.RecordAPITestBase; +import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -@RunWith(Parameterized.class) -public class IterationWithUnionITCase extends RecordAPITestBase { +public class IterationWithUnionITCase extends JavaProgramTestBase { private static final String DATAPOINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n"; protected String dataPath; protected String resultPath; - - public IterationWithUnionITCase(Configuration config) { - super(config); - setTaskManagerNumSlots(DOP); - } @Override protected void preSubmit() throws Exception { @@ -66,54 +51,36 @@ public class IterationWithUnionITCase extends RecordAPITestBase { } @Override - protected Plan getTestJob() { - return getPlan(config.getInteger("IterationWithUnionITCase#NumSubtasks", 1), dataPath, resultPath); - } - - @Parameters - public static Collection<Object[]> getConfigurations() { - Configuration config1 = new Configuration(); - config1.setInteger("IterationWithUnionITCase#NumSubtasks", DOP); - - return toParameterList(config1); - } - - private static Plan getPlan(int numSubTasks, String input, String output) { - FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input"); - initialInput.setDegreeOfParallelism(1); + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(initialInput); - iteration.setMaximumNumberOfIterations(2); - - @SuppressWarnings("unchecked") - MapOperator map2 = MapOperator.builder(new IdentityMapper()).input(iteration.getPartialSolution(), iteration.getPartialSolution()).name("map").build(); + DataSet<Record> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1); - iteration.setNextPartialSolution(map2); - - FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output"); - - Plan plan = new Plan(finalResult, "Iteration with union test"); - plan.setDefaultParallelism(numSubTasks); - return plan; + IterativeDataSet<Record> iteration = initialInput.iterate(2); + + DataSet<Record> result = iteration.union(iteration).map(new IdentityMapper()); + + iteration.closeWith(result).write(new PointOutFormat(), this.resultPath); + + env.execute(); } - static final class IdentityMapper extends MapFunction implements Serializable { + static final class IdentityMapper implements MapFunction<Record, Record>, Serializable { private static final long serialVersionUID = 1L; @Override - public void map(Record rec, Collector<Record> out) { - out.collect(rec); + public Record map(Record rec) { + return rec; } } - static final class DummyReducer extends ReduceFunction implements Serializable { + static final class DummyReducer implements GroupReduceFunction<Record, Record>, Serializable { private static final long serialVersionUID = 1L; @Override - public void reduce(Iterator<Record> it, Collector<Record> out) { - while (it.hasNext()) { - out.collect(it.next()); + public void reduce(Iterable<Record> it, Collector<Record> out) { + for (Record r : it) { + out.collect(r); } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java index 4c8177a..9382708 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java @@ -16,34 +16,27 @@ * limitations under the License. */ - package org.apache.flink.test.iterative.aggregators; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.aggregators.LongSumAggregator; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.functions.RichFlatMapFunction; import org.apache.flink.api.java.functions.RichGroupReduceFunction; -import org.apache.flink.api.java.functions.RichJoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; - import org.junit.Assert; - import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.IterativeDataSet; - /** - * - * Connected Components test case that uses a parametrizable aggregator - * + * Connected Components test case that uses a parameterizable aggregator */ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaProgramTestBase { @@ -147,7 +140,8 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP } } - public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { + public static final class NeighborWithComponentIDJoin implements JoinFunction + <Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; @@ -163,24 +157,23 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; - final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>(); + + private final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>(); @Override - public void reduce(Iterator<Tuple2<Long, Long>> values, - Collector<Tuple2<Long, Long>> out) throws Exception { - - final Tuple2<Long, Long> first = values.next(); - final Long vertexId = first.f0; - Long minimumCompId = first.f1; + public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) { + Long vertexId = 0L; + Long minimumCompId = Long.MAX_VALUE; - while (values.hasNext()) { - Long candidateCompId = values.next().f1; + for (Tuple2<Long, Long> value: values) { + vertexId = value.f0; + Long candidateCompId = value.f1; if (candidateCompId < minimumCompId) { minimumCompId = candidateCompId; } } - resultVertex.setField(vertexId, 0); - resultVertex.setField(minimumCompId, 1); + resultVertex.f0 = vertexId; + resultVertex.f1 = minimumCompId; out.collect(resultVertex); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java index 104c3df..039d64e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java @@ -16,11 +16,9 @@ * limitations under the License. */ - package org.apache.flink.test.iterative.aggregators; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.aggregators.ConvergenceCriterion; @@ -158,21 +156,19 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>(); @Override - public void reduce(Iterator<Tuple2<Long, Long>> values, - Collector<Tuple2<Long, Long>> out) throws Exception { - - final Tuple2<Long, Long> first = values.next(); - final Long vertexId = first.f0; - Long minimumCompId = first.f1; + public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) { + Long vertexId = 0L; + Long minimumCompId = Long.MAX_VALUE; - while (values.hasNext()) { - Long candidateCompId = values.next().f1; + for (Tuple2<Long, Long> value: values) { + vertexId = value.f0; + Long candidateCompId = value.f1; if (candidateCompId < minimumCompId) { minimumCompId = candidateCompId; } } - resultVertex.setField(vertexId, 0); - resultVertex.setField(minimumCompId, 1); + resultVertex.f0 = vertexId; + resultVertex.f1 = minimumCompId; out.collect(resultVertex); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java index 3060b68..4fd22a3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java @@ -24,6 +24,7 @@ import java.util.Collection; import org.apache.flink.api.common.aggregators.LongSumAggregator; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; @@ -31,6 +32,8 @@ import org.apache.flink.api.java.record.functions.MapFunction; import org.apache.flink.api.java.record.io.CsvInputFormat; import org.apache.flink.api.java.record.io.CsvOutputFormat; import org.apache.flink.api.java.record.io.FileOutputFormat; +import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction; +import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingClassReduceFunction; import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory; import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory; import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory; @@ -317,7 +320,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { intermediateConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE); intermediateConfig.setDriverComparator(comparator, 0); intermediateConfig.setStubWrapper( - new UserCodeClassWrapper<MinimumComponentIDReduce>(MinimumComponentIDReduce.class)); + new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingClassReduceFunction(MinimumComponentIDReduce.class))); } return intermediate; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java index 448b7bd..5a6e4f5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java @@ -19,13 +19,12 @@ package org.apache.flink.test.iterative.nephele; import java.util.Collection; -import java.util.Iterator; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; import org.apache.flink.api.java.record.io.FileOutputFormat; import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory; import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory; @@ -273,14 +272,14 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase { } } - public static final class DummyReducer extends ReduceFunction { + public static final class DummyReducer implements GroupReduceFunction<Record, Record> { private static final long serialVersionUID = 1L; @Override - public void reduce(Iterator<Record> it, Collector<Record> out) { - while (it.hasNext()) { - out.collect(it.next()); + public void reduce(Iterable<Record> it, Collector<Record> out) { + for (Record r :it) { + out.collect(r); } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java index 1ec0eb4..cd6a89d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative.nephele.customdanglingpagerank; import java.util.Iterator; @@ -83,11 +82,13 @@ public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction i } @Override - public void coGroup(Iterator<VertexWithRankAndDangling> currentPageRankIterator, Iterator<VertexWithRank> partialRanks, + public void coGroup(Iterable<VertexWithRankAndDangling> currentPageRankIterable, Iterable<VertexWithRank> partialRanks, Collector<VertexWithRankAndDangling> collector) { + final Iterator<VertexWithRankAndDangling> currentPageRankIterator = currentPageRankIterable.iterator(); + if (!currentPageRankIterator.hasNext()) { - long missingVertex = partialRanks.next().getVertexID(); + long missingVertex = partialRanks.iterator().next().getVertexID(); throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!"); } @@ -95,8 +96,8 @@ public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction i long edges = 0; double summedRank = 0; - while (partialRanks.hasNext()) { - summedRank += partialRanks.next().getRank(); + for (VertexWithRank pr :partialRanks) { + summedRank += pr.getRank(); edges++; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java index d83b33b..8d92c59 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java @@ -29,6 +29,7 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats; import org.apache.flink.util.Collector; +@SuppressWarnings("deprecation") public class CustomCompensatingMap extends AbstractRichFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java index 1e08a9f..8ec8403 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java @@ -35,12 +35,14 @@ public class CustomRankCombiner extends AbstractRichFunction implements GroupRed private final VertexWithRank accumulator = new VertexWithRank(); @Override - public void reduce(Iterator<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception { + public void reduce(Iterable<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception { throw new UnsupportedOperationException(); } @Override - public void combine(Iterator<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception { + public void combine(Iterable<VertexWithRank> recordsIterable, Collector<VertexWithRank> out) throws Exception { + final Iterator<VertexWithRank> records = recordsIterable.iterator(); + VertexWithRank next = records.next(); this.accumulator.setVertexID(next.getVertexID()); double rank = next.getRank(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java index ab3dea9..c51cf96 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative.nephele.danglingpagerank; import java.util.Iterator; @@ -102,7 +101,8 @@ public class CompensatableDotProductCoGroup extends CoGroupFunction { long edges = 0; double summedRank = 0; while (partialRanks.hasNext()) { - summedRank += partialRanks.next().getField(1, doubleInstance).getValue(); + Record pr = partialRanks.next(); + summedRank += pr.getField(1, doubleInstance).getValue(); edges++; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java index 3749c1d..d59d721 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java @@ -21,7 +21,6 @@ package org.apache.flink.test.javaApiOperators; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; -import java.util.Iterator; import java.util.LinkedList; import org.apache.flink.api.common.functions.CoGroupFunction; @@ -308,21 +307,19 @@ public class CoGroupITCase extends JavaProgramTestBase { @Override public void coGroup( - Iterator<Tuple5<Integer, Long, Integer, String, Long>> first, - Iterator<Tuple5<Integer, Long, Integer, String, Long>> second, - Collector<Tuple2<Integer, Integer>> out) throws Exception { - + Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, + Collector<Tuple2<Integer, Integer>> out) + { int sum = 0; int id = 0; - while(first.hasNext()) { - Tuple5<Integer, Long, Integer, String, Long> element = first.next(); + for ( Tuple5<Integer, Long, Integer, String, Long> element : first ) { sum += element.f2; id = element.f0; } - while(second.hasNext()) { - Tuple5<Integer, Long, Integer, String, Long> element = second.next(); + for ( Tuple5<Integer, Long, Integer, String, Long> element : second ) { sum += element.f2; id = element.f0; } @@ -336,27 +333,22 @@ public class CoGroupITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; @Override - public void coGroup(Iterator<CustomType> first, - Iterator<CustomType> second, Collector<CustomType> out) - throws Exception { + public void coGroup(Iterable<CustomType> first, Iterable<CustomType> second, Collector<CustomType> out) { CustomType o = new CustomType(0,0,"test"); - while(first.hasNext()) { - CustomType element = first.next(); + for ( CustomType element : first ) { o.myInt = element.myInt; o.myLong += element.myLong; } - while(second.hasNext()) { - CustomType element = second.next(); + for ( CustomType element : second ) { o.myInt = element.myInt; o.myLong += element.myLong; } out.collect(o); } - } public static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> { @@ -365,21 +357,19 @@ public class CoGroupITCase extends JavaProgramTestBase { @Override public void coGroup( - Iterator<Tuple5<Integer, Long, Integer, String, Long>> first, - Iterator<CustomType> second, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<CustomType> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception { long sum = 0; int id = 0; - while(first.hasNext()) { - Tuple5<Integer, Long, Integer, String, Long> element = first.next(); + for ( Tuple5<Integer, Long, Integer, String, Long> element : first ) { sum += element.f0; id = element.f2; } - while(second.hasNext()) { - CustomType element = second.next(); + for (CustomType element : second) { id = element.myInt; sum += element.myLong; } @@ -394,20 +384,18 @@ public class CoGroupITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; @Override - public void coGroup(Iterator<CustomType> first, - Iterator<Tuple5<Integer, Long, Integer, String, Long>> second, - Collector<CustomType> out) throws Exception { - + public void coGroup(Iterable<CustomType> first, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, + Collector<CustomType> out) + { CustomType o = new CustomType(0,0,"test"); - while(first.hasNext()) { - CustomType element = first.next(); + for (CustomType element : first) { o.myInt = element.myInt; o.myLong += element.myLong; } - while(second.hasNext()) { - Tuple5<Integer, Long, Integer, String, Long> element = second.next(); + for (Tuple5<Integer, Long, Integer, String, Long> element : second) { o.myInt = element.f2; o.myLong += element.f0; } @@ -423,14 +411,14 @@ public class CoGroupITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; @Override - public void coGroup(Iterator<Tuple3<Integer, Long, String>> first, - Iterator<Tuple3<Integer, Long, String>> second, - Collector<Tuple3<Integer, Long, String>> out) throws Exception { - - while(first.hasNext()) { - Tuple3<Integer, Long, String> element = first.next(); - if(element.f0 < 6) + public void coGroup(Iterable<Tuple3<Integer, Long, String>> first, + Iterable<Tuple3<Integer, Long, String>> second, + Collector<Tuple3<Integer, Long, String>> out) + { + for (Tuple3<Integer, Long, String> element : first) { + if(element.f0 < 6) { out.collect(element); + } } } } @@ -441,20 +429,16 @@ public class CoGroupITCase extends JavaProgramTestBase { @Override public void coGroup( - Iterator<Tuple5<Integer, Long, Integer, String, Long>> first, - Iterator<Tuple5<Integer, Long, Integer, String, Long>> second, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) - throws Exception { - - while(second.hasNext()) { - Tuple5<Integer, Long, Integer, String, Long> element = second.next(); - if(element.f0 < 4) + { + for (Tuple5<Integer, Long, Integer, String, Long> element : second) { + if(element.f0 < 4) { out.collect(element); + } } - } - - } public static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> { @@ -477,21 +461,19 @@ public class CoGroupITCase extends JavaProgramTestBase { @Override public void coGroup( - Iterator<Tuple5<Integer, Long, Integer, String, Long>> first, - Iterator<Tuple5<Integer, Long, Integer, String, Long>> second, - Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception { - + Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, + Collector<Tuple3<Integer, Integer, Integer>> out) + { int sum = 0; int id = 0; - while(first.hasNext()) { - Tuple5<Integer, Long, Integer, String, Long> element = first.next(); + for (Tuple5<Integer, Long, Integer, String, Long> element : first) { sum += element.f2; id = element.f0; } - while(second.hasNext()) { - Tuple5<Integer, Long, Integer, String, Long> element = second.next(); + for (Tuple5<Integer, Long, Integer, String, Long> element : second) { sum += element.f2; id = element.f0; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index 7376e86..bd10c5e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -422,16 +422,13 @@ public class GroupReduceITCase extends JavaProgramTestBase { public static class Tuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> { private static final long serialVersionUID = 1L; - @Override - public void reduce(Iterator<Tuple3<Integer, Long, String>> values, - Collector<Tuple2<Integer, Long>> out) throws Exception { + public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, Long>> out) { int i = 0; long l = 0l; - while(values.hasNext()) { - Tuple3<Integer, Long, String> t = values.next(); + for (Tuple3<Integer, Long, String> t : values) { i += t.f0; l = t.f1; } @@ -446,24 +443,22 @@ public class GroupReduceITCase extends JavaProgramTestBase { @Override - public void reduce(Iterator<Tuple3<Integer, Long, String>> values, - Collector<Tuple3<Integer, Long, String>> out) throws Exception { - - Tuple3<Integer, Long, String> t = values.next(); - - int sum = t.f0; - long key = t.f1; - String concat = t.f2; + public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { + int sum = 0; + long key = 0; + StringBuilder concat = new StringBuilder(); - while(values.hasNext()) { - t = values.next(); - - sum += t.f0; - concat += "-"+t.f2; + for (Tuple3<Integer, Long, String> next : values) { + sum += next.f0; + key = next.f1; + concat.append(next.f2).append("-"); } - out.collect(new Tuple3<Integer, Long, String>(sum, key, concat)); + if (concat.length() > 0) { + concat.setLength(concat.length() - 1); + } + out.collect(new Tuple3<Integer, Long, String>(sum, key, concat.toString())); } } @@ -472,16 +467,14 @@ public class GroupReduceITCase extends JavaProgramTestBase { @Override public void reduce( - Iterator<Tuple5<Integer, Long, Integer, String, Long>> values, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) - throws Exception { - + { int i = 0; long l = 0l; long l2 = 0l; - while(values.hasNext()) { - Tuple5<Integer, Long, Integer, String, Long> t = values.next(); + for ( Tuple5<Integer, Long, Integer, String, Long> t : values ) { i = t.f0; l += t.f1; l2 = t.f4; @@ -496,20 +489,19 @@ public class GroupReduceITCase extends JavaProgramTestBase { @Override - public void reduce(Iterator<CustomType> values, - Collector<CustomType> out) throws Exception { + public void reduce(Iterable<CustomType> values, Collector<CustomType> out) { + final Iterator<CustomType> iter = values.iterator(); CustomType o = new CustomType(); - CustomType c = values.next(); + CustomType c = iter.next(); o.myString = "Hello!"; o.myInt = c.myInt; o.myLong = c.myLong; - while(values.hasNext()) { - c = values.next(); - o.myLong += c.myLong; - + while (iter.hasNext()) { + CustomType next = iter.next(); + o.myLong += next.myLong; } out.collect(o); @@ -522,11 +514,9 @@ public class GroupReduceITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; @Override - public void reduce(Iterator<Tuple3<Integer, Long, String>> values, - Collector<Tuple3<Integer, Long, String>> out) throws Exception { + public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { - while(values.hasNext()) { - Tuple3<Integer, Long, String> t = values.next(); + for ( Tuple3<Integer, Long, String> t : values ) { if(t.f0 < 4) { t.f2 = "Hi!"; @@ -544,14 +534,12 @@ public class GroupReduceITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; @Override - public void reduce(Iterator<Tuple3<Integer, Long, String>> values, - Collector<Tuple3<Integer, Long, String>> out) throws Exception { + public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { int i = 0; long l = 0l; - while(values.hasNext()) { - Tuple3<Integer, Long, String> t = values.next(); + for ( Tuple3<Integer, Long, String> t : values ) { i += t.f0; l += t.f1; } @@ -564,21 +552,13 @@ public class GroupReduceITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; @Override - public void reduce(Iterator<CustomType> values, - Collector<CustomType> out) throws Exception { + public void reduce(Iterable<CustomType> values, Collector<CustomType> out) { - CustomType o = new CustomType(); - CustomType c = values.next(); - - o.myString = "Hello!"; - o.myInt = c.myInt; - o.myLong = c.myLong; - + CustomType o = new CustomType(0, 0, "Hello!"); - while(values.hasNext()) { - c = values.next(); - o.myInt += c.myInt; - o.myLong += c.myLong; + for (CustomType next : values) { + o.myInt += next.myInt; + o.myLong += next.myLong; } out.collect(o); @@ -602,14 +582,12 @@ public class GroupReduceITCase extends JavaProgramTestBase { } @Override - public void reduce(Iterator<Tuple3<Integer, Long, String>> values, - Collector<Tuple3<Integer, Long, String>> out) throws Exception { + public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { int i = 0; long l = 0l; - while(values.hasNext()) { - Tuple3<Integer, Long, String> t = values.next(); + for ( Tuple3<Integer, Long, String> t : values ) { i += t.f0; l = t.f1; } @@ -624,12 +602,11 @@ public class GroupReduceITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; @Override - public void combine(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception { + public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { Tuple3<Integer, Long, String> o = new Tuple3<Integer, Long, String>(0, 0l, ""); - while(values.hasNext()) { - Tuple3<Integer, Long, String> t = values.next(); + for ( Tuple3<Integer, Long, String> t : values ) { o.f0 += t.f0; o.f1 = t.f1; o.f2 = "test"+o.f1; @@ -639,14 +616,12 @@ public class GroupReduceITCase extends JavaProgramTestBase { } @Override - public void reduce(Iterator<Tuple3<Integer, Long, String>> values, - Collector<Tuple2<Integer, String>> out) throws Exception { + public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) { int i = 0; String s = ""; - while(values.hasNext()) { - Tuple3<Integer, Long, String> t = values.next(); + for ( Tuple3<Integer, Long, String> t : values ) { i += t.f0; s = t.f2; } @@ -661,12 +636,11 @@ public class GroupReduceITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; @Override - public void combine(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { + public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { Tuple3<Integer, Long, String> o = new Tuple3<Integer, Long, String>(0, 0l, ""); - while(values.hasNext()) { - Tuple3<Integer, Long, String> t = values.next(); + for ( Tuple3<Integer, Long, String> t : values ) { o.f0 += t.f0; o.f1 += t.f1; o.f2 += "test"; @@ -676,13 +650,12 @@ public class GroupReduceITCase extends JavaProgramTestBase { } @Override - public void reduce(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) { + public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) { int i = 0; String s = ""; - while(values.hasNext()) { - Tuple3<Integer, Long, String> t = values.next(); + for ( Tuple3<Integer, Long, String> t : values ) { i += t.f0 + t.f1; s += t.f2; } @@ -697,12 +670,11 @@ public class GroupReduceITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; @Override - public void combine(Iterator<CustomType> values, Collector<CustomType> out) throws Exception { + public void combine(Iterable<CustomType> values, Collector<CustomType> out) throws Exception { CustomType o = new CustomType(); - while(values.hasNext()) { - CustomType c = values.next(); + for ( CustomType c : values ) { o.myInt = c.myInt; o.myLong += c.myLong; o.myString = "test"+c.myInt; @@ -712,13 +684,11 @@ public class GroupReduceITCase extends JavaProgramTestBase { } @Override - public void reduce(Iterator<CustomType> values, - Collector<CustomType> out) throws Exception { + public void reduce(Iterable<CustomType> values, Collector<CustomType> out) { CustomType o = new CustomType(0, 0, ""); - while(values.hasNext()) { - CustomType c = values.next(); + for ( CustomType c : values) { o.myInt = c.myInt; o.myLong += c.myLong; o.myString = c.myString; @@ -733,6 +703,5 @@ public class GroupReduceITCase extends JavaProgramTestBase { @Override public T map(T value) { return value; } - } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java index 5a895d3..865c550 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.operators; import java.io.FileNotFoundException; @@ -26,8 +25,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.record.functions.CoGroupFunction; import org.apache.flink.api.java.record.io.DelimitedInputFormat; @@ -46,13 +43,9 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -/** - */ @RunWith(Parameterized.class) public class CoGroupITCase extends RecordAPITestBase { - private static final Log LOG = LogFactory.getLog(CoGroupITCase.class); - String leftInPath = null; String rightInPath = null; String resultPath = null; @@ -89,8 +82,6 @@ public class CoGroupITCase extends RecordAPITestBase { target.setField(0, keyString); target.setField(1, valueString); - LOG.debug("Read in: [" + keyString.getValue() + "," + valueString.getValue() + "]"); - return target; } @@ -112,9 +103,6 @@ public class CoGroupITCase extends RecordAPITestBase { this.buffer.append('\n'); byte[] bytes = this.buffer.toString().getBytes(); - - LOG.debug("Writing out: [" + keyString.toString() + "," + valueInteger.getValue() + "]"); - this.stream.write(bytes); } } @@ -124,33 +112,28 @@ public class CoGroupITCase extends RecordAPITestBase { private StringValue keyString = new StringValue(); private StringValue valueString = new StringValue(); - private Record record = new Record(); @Override public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) { - // TODO Auto-generated method stub + Record record = null; int sum = 0; - LOG.debug("Start iterating over input1"); + while (records1.hasNext()) { record = records1.next(); keyString = record.getField(0, keyString); valueString = record.getField(1, valueString); sum += Integer.parseInt(valueString.getValue()); - - LOG.debug("Processed: [" + keyString.getValue() + "," + valueString.getValue() + "]"); } - LOG.debug("Start iterating over input2"); + + while (records2.hasNext()) { record = records2.next(); keyString = record.getField(0, keyString); valueString = record.getField(1, valueString); sum -= Integer.parseInt(valueString.getValue()); - - LOG.debug("Processed: [" + keyString.getValue() + "," + valueString.getValue() + "]"); } record.setField(1, new IntValue(sum)); - LOG.debug("Finished"); out.collect(record); } @@ -197,9 +180,9 @@ public class CoGroupITCase extends RecordAPITestBase { LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE}; + String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE }; - String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH, }; + String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH }; for (String localStrategy : localStrategies) { for (String shipStrategy : shipStrategies) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java index b06cf47..2711417 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java @@ -16,11 +16,8 @@ * limitations under the License. */ - package org.apache.flink.test.operators; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.record.functions.ReduceFunction; import org.apache.flink.api.java.record.io.DelimitedInputFormat; @@ -53,8 +50,6 @@ import java.util.LinkedList; @RunWith(Parameterized.class) public class ReduceITCase extends RecordAPITestBase { - - private static final Log LOG = LogFactory.getLog(ReduceITCase.class); String inPath = null; String resultPath = null; @@ -83,17 +78,14 @@ public class ReduceITCase extends RecordAPITestBase { private StringValue combineValue = new StringValue(); @Override - public void combine(Iterator<Record> records, Collector<Record> out) throws Exception { - + public void combine(Iterator<Record> records, Collector<Record> out) { + Record record = null; int sum = 0; - Record record = new Record(); + while (records.hasNext()) { record = records.next(); combineValue = record.getField(1, combineValue); sum += Integer.parseInt(combineValue.toString()); - - LOG.debug("Processed: [" + record.getField(0, StringValue.class).toString() + - "," + combineValue.toString() + "]"); } combineValue.setValue(sum + ""); record.setField(1, combineValue); @@ -101,17 +93,14 @@ public class ReduceITCase extends RecordAPITestBase { } @Override - public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { - + public void reduce(Iterator<Record> records, Collector<Record> out) { + Record record = null; int sum = 0; - Record record = new Record(); + while (records.hasNext()) { record = records.next(); reduceValue = record.getField(1, reduceValue); sum += Integer.parseInt(reduceValue.toString()); - - LOG.debug("Processed: [" + record.getField(0, StringValue.class).toString() + - "," + reduceValue.toString() + "]"); } record.setField(1, new IntValue(sum)); out.collect(record); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72d7b862/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java index 60b7512..0b84309 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/GroupOrderReduceITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.recordJobTests; import java.io.Serializable;
