http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java index f42eb02..41e0eb9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.dataproperties.GlobalProperties; @@ -52,7 +53,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(0,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -73,7 +74,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -96,7 +97,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(0,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -119,7 +120,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -142,7 +143,7 @@ public class PartitioningReusageTest extends CompilerTestBase { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -164,7 +165,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -187,7 +188,7 @@ public class PartitioningReusageTest extends CompilerTestBase { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -212,7 +213,7 @@ public class PartitioningReusageTest extends CompilerTestBase { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(0,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -238,7 +239,7 @@ public class PartitioningReusageTest extends CompilerTestBase { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -263,7 +264,7 @@ public class PartitioningReusageTest extends CompilerTestBase { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,1).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -288,7 +289,7 @@ public class PartitioningReusageTest extends CompilerTestBase { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,2).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -313,7 +314,7 @@ public class PartitioningReusageTest extends CompilerTestBase { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,2).equalTo(2,1).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -338,7 +339,7 @@ public class PartitioningReusageTest extends CompilerTestBase { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,2).equalTo(1,2).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -363,7 +364,7 @@ public class PartitioningReusageTest extends CompilerTestBase { JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST) .where(0,2).equalTo(1,2).with(new MockJoin()); - joined.print(); + joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -384,7 +385,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .coGroup(set2) .where(0,1).equalTo(0,1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -405,7 +406,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .coGroup(set2) .where(0,1).equalTo(2,1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -428,7 +429,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .coGroup(set2) .where(0,1).equalTo(0,1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -451,7 +452,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .coGroup(set2) .where(0,1).equalTo(2,1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -473,7 +474,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .withForwardedFields("2;1")) .where(0,1).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -495,7 +496,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .coGroup(set2) .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -517,7 +518,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .withForwardedFields("2")) .where(0,1).equalTo(2,1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -541,7 +542,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .withForwardedFields("0;1")) .where(0, 1).equalTo(0, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -566,7 +567,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .withForwardedFields("1;2")) .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -590,7 +591,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .withForwardedFields("2;1")) .where(0, 1).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -614,7 +615,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .withForwardedFields("1")) .where(0, 2).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -638,7 +639,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .withForwardedFields("1")) .where(0, 2).equalTo(2, 1).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -662,7 +663,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .withForwardedFields("2")) .where(0, 2).equalTo(1, 2).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -686,7 +687,7 @@ public class PartitioningReusageTest extends CompilerTestBase { .withForwardedFields("1")) .where(0, 2).equalTo(1, 2).with(new MockCoGroup()); - coGrouped.print(); + coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java index 84f6377..68e8a41 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java @@ -20,6 +20,8 @@ package org.apache.flink.optimizer; import static org.junit.Assert.*; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.testfunctions.SelectOneReducer; import org.apache.flink.optimizer.util.CompilerTestBase; @@ -50,7 +52,7 @@ public class PipelineBreakerTest extends CompilerTestBase { .map(new IdentityMapper<Long>()) .withBroadcastSet(source, "bc"); - result.print(); + result.output(new DiscardingOutputFormat<Long>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -84,7 +86,7 @@ public class PipelineBreakerTest extends CompilerTestBase { .withBroadcastSet(bcInput1, "bc1") .withBroadcastSet(bcInput2, "bc2"); - result.print(); + result.output(new DiscardingOutputFormat<Long>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -123,7 +125,7 @@ public class PipelineBreakerTest extends CompilerTestBase { .withBroadcastSet(bcInput1, "bc1"); - iteration.closeWith(result).print(); + iteration.closeWith(result).output(new DiscardingOutputFormat<Long>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -154,7 +156,7 @@ public class PipelineBreakerTest extends CompilerTestBase { initialSource .map(new IdentityMapper<Long>()) .cross(initialSource).withParameters(conf) - .print(); + .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); Plan p = env.createProgramPlan(); @@ -176,7 +178,7 @@ public class PipelineBreakerTest extends CompilerTestBase { initialSource .map(new IdentityMapper<Long>()) .cross(initialSource).withParameters(conf) - .print(); + .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); Plan p = env.createProgramPlan(); @@ -199,7 +201,7 @@ public class PipelineBreakerTest extends CompilerTestBase { initialSource .map(new IdentityMapper<Long>()) .cross(initialSource).withParameters(conf) - .print(); + .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); Plan p = env.createProgramPlan(); @@ -222,7 +224,7 @@ public class PipelineBreakerTest extends CompilerTestBase { initialSource .map(new IdentityMapper<Long>()) .cross(initialSource).withParameters(conf) - .print(); + .output(new DiscardingOutputFormat<Tuple2<Long,Long>>()); Plan p = env.createProgramPlan(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java index d6b9444..dc9f2e5 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple2; @@ -64,7 +65,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { data.getSplitDataProperties() .splitsPartitionedBy(0); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long,String>>()); JavaPlan plan = env.createProgramPlan(); @@ -97,7 +98,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { data.getSplitDataProperties() .splitsPartitionedBy(1, 0); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long,String>>()); JavaPlan plan = env.createProgramPlan(); @@ -129,7 +130,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { data.getSplitDataProperties() .splitsPartitionedBy("*"); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>()); JavaPlan plan = env.createProgramPlan(); @@ -161,7 +162,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { data.getSplitDataProperties() .splitsPartitionedBy("f1"); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>()); JavaPlan plan = env.createProgramPlan(); @@ -193,7 +194,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { data.getSplitDataProperties() .splitsPartitionedBy("f1.stringField"); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -225,7 +226,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { data.getSplitDataProperties() .splitsPartitionedBy("f1.intField; f2"); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -258,7 +259,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { data.getSplitDataProperties() .splitsPartitionedBy("byDate", 1, 0); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -293,7 +294,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy(0) .splitsGroupedBy(0); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -327,7 +328,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy(0) .splitsGroupedBy(1, 0); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -362,7 +363,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy(1) .splitsGroupedBy(0); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -396,7 +397,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy(0, 1) .splitsGroupedBy(0); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -429,7 +430,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy("f2") .splitsGroupedBy("f2"); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -463,7 +464,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy("f1.intField") .splitsGroupedBy("f0; f1.intField"); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -497,7 +498,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy("f1.intField") .splitsGroupedBy("f1"); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -530,7 +531,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy("f1") .splitsGroupedBy("f1.stringField"); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -565,7 +566,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy(1) .splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING}); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -599,7 +600,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy(1) .splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING}); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -634,7 +635,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy(0) .splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING}); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -668,7 +669,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy(0, 1) .splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING}); - data.print(); + data.output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -701,7 +702,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy("f1.intField") .splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING}); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -735,7 +736,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy("f1.intField") .splitsOrderedBy("f1", new Order[]{Order.DESCENDING}); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -768,7 +769,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { .splitsPartitionedBy("f1") .splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING}); - data.print(); + data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -808,7 +809,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { data2.getSplitDataProperties() .splitsPartitionedBy("byDate", 0); - data1.union(data2).print(); + data1.union(data2).output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); @@ -856,7 +857,7 @@ public class PropertyDataSourceTest extends CompilerTestBase { data2.getSplitDataProperties() .splitsPartitionedBy("byDate", 0); - data1.union(data2).print(); + data1.union(data2).output(new DiscardingOutputFormat<Tuple2<Long, String>>()); JavaPlan plan = env.createProgramPlan(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java index 3af64fc..b0dca66 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java @@ -20,36 +20,35 @@ package org.apache.flink.optimizer; import static org.junit.Assert.fail; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityReduce; import org.junit.Test; /** * This test case has been created to validate a bug that occurred when * the ReduceOperator was used without a grouping key. */ -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial"}) public class ReduceAllTest extends CompilerTestBase { @Test public void testReduce() { // construct the plan - FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source"); - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce()).name("Reduce1").input(source).build(); - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink"); - sink.setInput(reduce1); - Plan plan = new Plan(sink, "AllReduce Test"); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet<Long> set1 = env.generateSequence(0,1); + + set1.reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1") + .output(new DiscardingOutputFormat<Long>()).name("Sink"); + + JavaPlan plan = env.createProgramPlan(); + try { OptimizedPlan oPlan = compileNoStats(plan); JobGraphGenerator jobGen = new JobGraphGenerator(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java index 25643a4..26af380 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java @@ -44,7 +44,7 @@ import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial", "unchecked"}) public class ReplicatingDataSourceTest extends CompilerTestBase { /** http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java index 3a24ce1..00ada2a 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.dataproperties.GlobalProperties; @@ -57,7 +58,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase { .groupBy(1) .reduce(new MockReducer()).withForwardedFields("*"); - set.print(); + set.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); @@ -118,7 +119,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase { .reduce(new MockReducer()).withForwardedFields("f1->f2"); DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin()); - out.print(); + out.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>()); JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileWithStats(plan); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java index 65e5025..a94f845 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java @@ -20,6 +20,7 @@ package org.apache.flink.optimizer; import static org.junit.Assert.*; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.common.Plan; @@ -30,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; @@ -51,12 +52,12 @@ public class SortPartialReuseTest extends CompilerTestBase { .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2") .groupBy(0, 1) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2") + .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2") .groupBy(0) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()) - - .print(); + .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()) + + .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -96,12 +97,12 @@ public class SortPartialReuseTest extends CompilerTestBase { .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2") .groupBy(0, 1) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2") + .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2") .groupBy(1) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()) - - .print(); + .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()) + + .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java index 1001626..f041b2a 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.*; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.optimizer.plan.BinaryUnionPlanNode; import org.apache.flink.optimizer.plan.BulkIterationPlanNode; @@ -50,8 +51,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase { DataSet<Long> result = iteration.closeWith( input2.union(input2).union(iteration.union(iteration))); - result.print(); - result.print(); + result.output(new DiscardingOutputFormat<Long>()); + result.output(new DiscardingOutputFormat<Long>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -102,8 +103,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase { DataSet<Long> iterResult = iteration .closeWith(iteration.union(iteration).union(input2.union(input2))); - iterResult.print(); - iterResult.print(); + iterResult.output(new DiscardingOutputFormat<Long>()); + iterResult.output(new DiscardingOutputFormat<Long>()); Plan p = env.createProgramPlan(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java index 2e52565..fee6e17 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java @@ -26,13 +26,12 @@ import org.apache.flink.api.common.operators.base.FlatMapOperatorBase; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.types.IntValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Visitor; import org.junit.Assert; @@ -45,37 +44,26 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.PlanNode; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityReduce; -@SuppressWarnings({"serial", "deprecation"}) +@SuppressWarnings({"serial"}) public class UnionPropertyPropagationTest extends CompilerTestBase { - @SuppressWarnings("unchecked") @Test - public void testUnionPropertyOldApiPropagation() { + public void testUnion1() { // construct the plan + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet<Long> sourceA = env.generateSequence(0,1); + DataSet<Long> sourceB = env.generateSequence(0,1); - FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE); - FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE); - - ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(sourceA) - .build(); - ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(sourceB) - .build(); - - ReduceOperator globalRed = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).build(); - globalRed.addInput(redA); - globalRed.addInput(redB); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, globalRed); - - // return the plan - Plan plan = new Plan(sink, "Union Property Propagation"); + DataSet<Long> redA = sourceA.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()); + DataSet<Long> redB = sourceB.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()); + + redA.union(redB).groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()) + .output(new DiscardingOutputFormat<Long>()); + + JavaPlan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); @@ -88,7 +76,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase { @Override public boolean preVisit(PlanNode visitable) { - if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperator) { + if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase) { for (Channel inConn : visitable.getInputs()) { Assert.assertTrue("Reduce should just forward the input if it is already partitioned", inConn.getShipStrategy() == ShipStrategyType.FORWARD); @@ -107,7 +95,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase { } @Test - public void testUnionNewApiAssembly() { + public void testUnion2() { final int NUM_INPUTS = 4; // construct the plan it will be multiple flat maps, all unioned http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java index e81e0ec..65dd2b3 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java @@ -21,6 +21,7 @@ package org.apache.flink.optimizer; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.util.CompilerTestBase; @@ -40,8 +41,8 @@ public class UnionReplacementTest extends CompilerTestBase { DataSet<String> union = input1.union(input2); - union.print(); - union.print(); + union.output(new DiscardingOutputFormat<String>()); + union.output(new DiscardingOutputFormat<String>()); Plan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java index 321ca5a..32bd6e9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java @@ -24,11 +24,13 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @@ -40,12 +42,13 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 100).map(new Duplicator<Long>()); + DataSet<Tuple2<Long, Long>> input = env.readCsvFile("/tmp/some.csv").types(Long.class, Long.class); DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 100, 1); - DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>()); - iteration.closeWith(iterEnd, iterEnd).print(); + DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>()); + iteration.closeWith(iterEnd, iterEnd) + .output(new DiscardingOutputFormat<Tuple2<Long,Long>>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -61,18 +64,5 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase { fail(e.getMessage()); } } - - private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> { - @Override - public Tuple2<T, T> map(T value) { - return new Tuple2<T, T>(value, value); - } - } - - private static final class TestMapper<T> implements MapFunction<T, T> { - @Override - public T map(T value) { - return value; - } - } + } http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java index 27f367f..46b9357 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java @@ -25,25 +25,21 @@ import static org.junit.Assert.fail; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.java.record.operators.DeltaIteration; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.operators.JoinOperator; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyMatchStub; -import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityMap; -import org.apache.flink.optimizer.util.IdentityReduce; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityJoiner; +import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.types.LongValue; import org.junit.Test; @@ -51,7 +47,6 @@ import org.junit.Test; * Tests that validate optimizer choices when using operators that are requesting certain specific execution * strategies. */ -@SuppressWarnings("deprecation") public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase { private static final long serialVersionUID = 1L; @@ -66,7 +61,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase { @Test public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() { - Plan plan = getRecordTestPlan(false, true); + Plan plan = getTestPlan(false, true); OptimizedPlan oPlan; try { @@ -112,7 +107,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase { @Test public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() { - Plan plan = getRecordTestPlan(false, false); + Plan plan = getTestPlan(false, false); OptimizedPlan oPlan; try { @@ -156,7 +151,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase { @Test public void testRecordApiWithDirectSoltionSetUpdate() { - Plan plan = getRecordTestPlan(true, false); + Plan plan = getTestPlan(true, false); OptimizedPlan oPlan; try { @@ -197,52 +192,45 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase { new JobGraphGenerator().compileJobGraph(oPlan); } - private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) { - FileDataSource solutionSetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Solution Set"); - FileDataSource worksetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Workset"); - - FileDataSource invariantInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Invariant Input"); - - DeltaIteration iteration = new DeltaIteration(0, ITERATION_NAME); - iteration.setInitialSolutionSet(solutionSetInput); - iteration.setInitialWorkset(worksetInput); - iteration.setMaximumNumberOfIterations(100); - - JoinOperator joinWithInvariant = JoinOperator.builder(new DummyMatchStub(), LongValue.class, 0, 0) - .input1(iteration.getWorkset()) - .input2(invariantInput) - .name(JOIN_WITH_INVARIANT_NAME) - .build(); - - JoinOperator joinWithSolutionSet = JoinOperator.builder( - joinPreservesSolutionSet ? new DummyMatchStub() : new DummyNonPreservingMatchStub(), LongValue.class, 0, 0) - .input1(iteration.getSolutionSet()) - .input2(joinWithInvariant) - .name(JOIN_WITH_SOLUTION_SET) - .build(); - - ReduceOperator nextWorkset = ReduceOperator.builder(new IdentityReduce(), LongValue.class, 0) - .input(joinWithSolutionSet) - .name(NEXT_WORKSET_REDUCER_NAME) - .build(); - - if (mapBeforeSolutionDelta) { - MapOperator mapper = MapOperator.builder(new IdentityMap()) - .input(joinWithSolutionSet) - .name(SOLUTION_DELTA_MAPPER_NAME) - .build(); - iteration.setSolutionSetDelta(mapper); - } else { - iteration.setSolutionSetDelta(joinWithSolutionSet); + private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) { + + // construct the plan + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + DataSet<Tuple2<Long, Long>> solSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution Set"); + DataSet<Tuple2<Long, Long>> workSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset"); + DataSet<Tuple2<Long, Long>> invariantInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant Input"); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIt = solSetInput.iterateDelta(workSetInput, 100, 0).name(ITERATION_NAME); + + DataSet<Tuple2<Long, Long>> join1 = deltaIt.getWorkset().join(invariantInput).where(0).equalTo(0) + .with(new IdentityJoiner<Tuple2<Long, Long>>()) + .withForwardedFieldsFirst("*").name(JOIN_WITH_INVARIANT_NAME); + + DataSet<Tuple2<Long, Long>> join2 = deltaIt.getSolutionSet().join(join1).where(0).equalTo(0) + .with(new IdentityJoiner<Tuple2<Long, Long>>()) + .name(JOIN_WITH_SOLUTION_SET); + if(joinPreservesSolutionSet) { + ((JoinOperator<?,?,?>)join2).withForwardedFieldsFirst("*"); } - - iteration.setNextWorkset(nextWorkset); - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, iteration, "Sink"); - - Plan plan = new Plan(sink); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - return plan; + DataSet<Tuple2<Long, Long>> nextWorkset = join2.groupBy(0).reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>()) + .withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME); + + if(mapBeforeSolutionDelta) { + + DataSet<Tuple2<Long, Long>> mapper = join2.map(new IdentityMapper<Tuple2<Long, Long>>()) + .withForwardedFields("*").name(SOLUTION_DELTA_MAPPER_NAME); + + deltaIt.closeWith(mapper, nextWorkset) + .output(new DiscardingOutputFormat<Tuple2<Long,Long>>()); + } + else { + deltaIt.closeWith(join2, nextWorkset) + .output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); + } + + return env.createProgramPlan(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java index d52181d..346e702 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java @@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; @@ -246,7 +246,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase { .distinct(0, 1) .groupBy(1) .sortGroup(0, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1"); + .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1"); grouped .coGroup(partitioned).where(0).equalTo(0) http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java index 5758c86..17a7659 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java @@ -32,7 +32,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -57,7 +57,7 @@ public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase { .withPartitioner(partitioner); joined.groupBy(1).withPartitioner(partitioner) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()) + .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()) .print(); Plan p = env.createProgramPlan(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java index 0408ca9..23f4812 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java @@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyReducer; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -84,7 +84,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase { data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>()) .withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>()) + .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>()) .print(); Plan p = env.createProgramPlan(); @@ -115,7 +115,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase { data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>()) .withPartitioner(new TestPartitionerInt()) .sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>()) + .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>()) .print(); Plan p = env.createProgramPlan(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java index 74e5c8c..54033ac 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java @@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyReducer; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -76,7 +76,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase { .rebalance().setParallelism(4); data.groupBy("a").withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducer<Pojo2>()) + .reduceGroup(new IdentityGroupReducerCombinable<Pojo2>()) .print(); Plan p = env.createProgramPlan(); @@ -106,7 +106,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase { data.groupBy("a").withPartitioner(new TestPartitionerInt()) .sortGroup("b", Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Pojo3>()) + .reduceGroup(new IdentityGroupReducerCombinable<Pojo3>()) .print(); Plan p = env.createProgramPlan(); @@ -137,7 +137,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase { data.groupBy("a").withPartitioner(new TestPartitionerInt()) .sortGroup("b", Order.ASCENDING) .sortGroup("c", Order.DESCENDING) - .reduceGroup(new IdentityGroupReducer<Pojo4>()) + .reduceGroup(new IdentityGroupReducerCombinable<Pojo4>()) .print(); Plan p = env.createProgramPlan(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java index 72fb81b..49f44f5 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java @@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyReducer; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -108,7 +108,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase { .rebalance().setParallelism(4); data.groupBy(0).withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>()) + .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>()) .print(); Plan p = env.createProgramPlan(); @@ -138,7 +138,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase { data.groupBy(0).withPartitioner(new TestPartitionerInt()) .sortGroup(1, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>()) + .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>()) .print(); Plan p = env.createProgramPlan(); @@ -169,7 +169,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase { data.groupBy(0).withPartitioner(new TestPartitionerInt()) .sortGroup(1, Order.ASCENDING) .sortGroup(2, Order.DESCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>()) + .reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>()) .print(); Plan p = env.createProgramPlan(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java index 8eedee1..ff429b8 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java @@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; @@ -243,7 +243,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase { .distinct(0, 1) .groupBy(1) .sortGroup(0, Order.ASCENDING) - .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1"); + .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1"); grouped .join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0) http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java index 95ee4de..9c2d0d2 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java @@ -30,7 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -49,7 +49,7 @@ public class PartitionOperatorTest extends CompilerTestBase { public int partition(Long key, int numPartitions) { return key.intValue(); } }, 1) .groupBy(1) - .reduceGroup(new IdentityGroupReducer<Tuple2<Long,Long>>()) + .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>()) .print(); Plan p = env.createProgramPlan(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java new file mode 100644 index 0000000..9d8ac2e --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.util.Collector; + +public class IdentityCoGrouper<T> implements CoGroupFunction<T, T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable<T> first, Iterable<T> second, Collector<T> out) {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java new file mode 100644 index 0000000..54b2785 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.CrossFunction; + +public class IdentityCrosser<T> implements CrossFunction<T, T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public T cross(T first, T second) { + return first; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java index 11fd044..da4ef17 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java @@ -23,8 +23,6 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.util.Collector; - -@Combinable public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java new file mode 100644 index 0000000..ce24bb6 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.testfunctions; + + +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; +import org.apache.flink.util.Collector; + +@Combinable +public class IdentityGroupReducerCombinable<T> extends RichGroupReduceFunction<T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public void reduce(Iterable<T> values, Collector<T> out) { + for (T next : values) { + out.collect(next); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java new file mode 100644 index 0000000..faca2ce --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.JoinFunction; + +public class IdentityJoiner<T> implements JoinFunction<T, T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public T join(T first, T second) { + return first; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java deleted file mode 100644 index 6a84c44..0000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.flink.api.java.record.functions.CoGroupFunction; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -@SuppressWarnings("deprecation") -public class DummyCoGroupStub extends CoGroupFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) { - while (records1.hasNext()) { - out.collect(records1.next()); - } - - while (records2.hasNext()) { - out.collect(records2.next()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java deleted file mode 100644 index 8ee2285..0000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import org.apache.flink.api.java.record.functions.CrossFunction; -import org.apache.flink.types.Record; - -@SuppressWarnings("deprecation") -public class DummyCrossStub extends CrossFunction { - private static final long serialVersionUID = 1L; - - @Override - public Record cross(Record first, Record second) throws Exception { - return first; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java deleted file mode 100644 index 0c816e7..0000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.record.io.DelimitedInputFormat; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; - -public final class DummyInputFormat extends DelimitedInputFormat { - private static final long serialVersionUID = 1L; - - private final IntValue integer = new IntValue(1); - - @Override - public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) { - target.setField(0, this.integer); - target.setField(1, this.integer); - return target; - } - - @Override - public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) { - return (cachedStatistics instanceof FileBaseStatistics) ? (FileBaseStatistics) cachedStatistics : null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java deleted file mode 100644 index d00be6e..0000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import java.io.Serializable; - -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -@SuppressWarnings("deprecation") -@ConstantFieldsFirstExcept({}) -public class DummyMatchStub extends JoinFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void join(Record value1, Record value2, Collector<Record> out) throws Exception { - out.collect(value1); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java deleted file mode 100644 index 444b48e..0000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import java.io.Serializable; - -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -@SuppressWarnings("deprecation") -public class DummyNonPreservingMatchStub extends JoinFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void join(Record value1, Record value2, Collector<Record> out) throws Exception { - out.collect(value1); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java deleted file mode 100644 index 1bbe24c..0000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.util; - - -import org.apache.flink.api.java.record.io.DelimitedOutputFormat; -import org.apache.flink.types.Record; - - -public final class DummyOutputFormat extends DelimitedOutputFormat { - private static final long serialVersionUID = 1L; - - @Override - public int serializeRecord(Record rec, byte[] target) throws Exception { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java deleted file mode 100644 index cccc6cb..0000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import java.io.Serializable; - -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -@SuppressWarnings("deprecation") -@ConstantFieldsExcept({}) -public final class IdentityMap extends MapFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void map(Record record, Collector<Record> out) throws Exception { - out.collect(record); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java deleted file mode 100644 index f45745d..0000000 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -@SuppressWarnings("deprecation") -@ConstantFieldsExcept({}) -public final class IdentityReduce extends ReduceFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { - while (records.hasNext()) { - out.collect(records.next()); - } - } -}