Repository: flink Updated Branches: refs/heads/master 1b6903216 -> 633b0d6a9
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java new file mode 100644 index 0000000..8720aa7 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.java; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichJoinFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.util.Collector; +import org.junit.Test; + +/** +* Tests that validate optimizer choices when using operators that are requesting certain specific execution +* strategies. +*/ +@SuppressWarnings("serial") +public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { + + private static final String JOIN_WITH_INVARIANT_NAME = "Test Join Invariant"; + private static final String JOIN_WITH_SOLUTION_SET = "Test Join SolutionSet"; + private static final String NEXT_WORKSET_REDUCER_NAME = "Test Reduce Workset"; + private static final String SOLUTION_DELTA_MAPPER_NAME = "Test Map Delta"; + + @Test + public void testJavaApiWithDeferredSoltionSetUpdateWithMapper() { + try { + Plan plan = getJavaTestPlan(false, true); + + OptimizedPlan oPlan = compileNoStats(plan); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); + DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME); + DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET); + SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME); + SingleInputPlanNode deltaMapper = resolver.getNode(SOLUTION_DELTA_MAPPER_NAME); + + // iteration preserves partitioning in reducer, so the first partitioning is out of the loop, + // the in-loop partitioning is before the final reducer + + // verify joinWithInvariant + assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy()); + assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1()); + assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2()); + + // verify joinWithSolutionSet + assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy()); + assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1()); + + + // verify reducer + assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy()); + assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0)); + + // currently, the system may partition before or after the mapper + ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy(); + ShipStrategyType ss2 = deltaMapper.getOutgoingChannels().get(0).getShipStrategy(); + + assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) || + (ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) ); + + new JobGraphGenerator().compileJobGraph(oPlan); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test errored: " + e.getMessage()); + } + } + + @Test + public void testJavaApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() { + try { + Plan plan = getJavaTestPlan(false, false); + + OptimizedPlan oPlan = compileNoStats(plan); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); + DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME); + DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET); + SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME); + + // iteration preserves partitioning in reducer, so the first partitioning is out of the loop, + // the in-loop partitioning is before the final reducer + + // verify joinWithInvariant + assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy()); + assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1()); + assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2()); + + // verify joinWithSolutionSet + assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy()); + assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1()); + + // verify reducer + assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy()); + assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0)); + + // verify solution delta + assertEquals(2, joinWithSolutionSetNode.getOutgoingChannels().size()); + assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy()); + + new JobGraphGenerator().compileJobGraph(oPlan); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test errored: " + e.getMessage()); + } + } + + @Test + public void testJavaApiWithDirectSoltionSetUpdate() { + try { + Plan plan = getJavaTestPlan(true, false); + + OptimizedPlan oPlan = compileNoStats(plan); + + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); + DualInputPlanNode joinWithInvariantNode = resolver.getNode(JOIN_WITH_INVARIANT_NAME); + DualInputPlanNode joinWithSolutionSetNode = resolver.getNode(JOIN_WITH_SOLUTION_SET); + SingleInputPlanNode worksetReducer = resolver.getNode(NEXT_WORKSET_REDUCER_NAME); + + // iteration preserves partitioning in reducer, so the first partitioning is out of the loop, + // the in-loop partitioning is before the final reducer + + // verify joinWithInvariant + assertEquals(ShipStrategyType.FORWARD, joinWithInvariantNode.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.PARTITION_HASH, joinWithInvariantNode.getInput2().getShipStrategy()); + assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput1()); + assertEquals(new FieldList(1, 2), joinWithInvariantNode.getKeysForInput2()); + + // verify joinWithSolutionSet + assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy()); + assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy()); + assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1()); + + // verify reducer + assertEquals(ShipStrategyType.FORWARD, worksetReducer.getInput().getShipStrategy()); + assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0)); + + + // verify solution delta + assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size()); + assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy()); + + new JobGraphGenerator().compileJobGraph(oPlan); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test errored: " + e.getMessage()); + } + } + + + @Test + public void testRejectPlanIfSolutionSetKeysAndJoinKeysDontMatch() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set"); + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> worksetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset"); + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> invariantInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant Input"); + + DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2); + + + DataSet<Tuple3<Long, Long, Long>> result = + + iter.getWorkset().join(invariantInput) + .where(1, 2) + .equalTo(1, 2) + .with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() { + public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) { + return first; + } + }); + + try { + result.join(iter.getSolutionSet()) + .where(1, 0) + .equalTo(0, 2) + .with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() { + public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) { + return second; + } + }); + fail("The join should be rejected with key type mismatches."); + } + catch (InvalidProgramException e) { + // expected! + } + + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test errored: " + e.getMessage()); + } + } + + private Plan getJavaTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> solutionSetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Solution Set"); + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> worksetInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Workset"); + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> invariantInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Invariant Input"); + + DeltaIteration<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> iter = solutionSetInput.iterateDelta(worksetInput, 100, 1, 2); + + + DataSet<Tuple3<Long, Long, Long>> joinedWithSolutionSet = + + iter.getWorkset().join(invariantInput) + .where(1, 2) + .equalTo(1, 2) + .with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() { + public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) { + return first; + } + }) + .name(JOIN_WITH_INVARIANT_NAME) + + .join(iter.getSolutionSet()) + .where(1, 0) + .equalTo(1, 2) + .with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() { + public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) { + return second; + } + }) + .name(JOIN_WITH_SOLUTION_SET) + .withForwardedFieldsSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null); + + DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2) + .reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() { + public void reduce(Iterable<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {} + }) + .name(NEXT_WORKSET_REDUCER_NAME) + .withForwardedFields("1->1","2->2","0->0"); + + + DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = mapBeforeSolutionDelta ? + joinedWithSolutionSet.map(new RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } }) + .name(SOLUTION_DELTA_MAPPER_NAME).withForwardedFields("0->0","1->1","2->2") : + joinedWithSolutionSet; + + iter.closeWith(nextSolutionSet, nextWorkset) + .print(); + + return env.createProgramPlan(); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java new file mode 100644 index 0000000..23f8897 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.junit.Test; + +@SuppressWarnings("serial") +public class CoGroupGlobalPropertiesCompatibilityTest { + + @Test + public void checkCompatiblePartitionings() { + try { + final FieldList keysLeft = new FieldList(1, 4); + final FieldList keysRight = new FieldList(3, 1); + + CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight); + + // test compatible hash partitioning + { + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setHashPartitioned(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setHashPartitioned(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setHashPartitioned(keysLeft); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setHashPartitioned(keysRight); + + assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + // test compatible custom partitioning + { + Partitioner<Object> part = new Partitioner<Object>() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setCustomPartitioned(keysLeft, part); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setCustomPartitioned(keysRight, part); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setCustomPartitioned(keysLeft, part); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part); + + assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + // test custom partitioning matching any partitioning + { + Partitioner<Object> part = new Partitioner<Object>() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setAnyPartitioning(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setAnyPartitioning(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setCustomPartitioned(keysLeft, part); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part); + + assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void checkInompatiblePartitionings() { + try { + final FieldList keysLeft = new FieldList(1); + final FieldList keysRight = new FieldList(3); + + final Partitioner<Object> part = new Partitioner<Object>() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + final Partitioner<Object> part2 = new Partitioner<Object>() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + + CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight); + + // test incompatible hash with custom partitioning + { + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setAnyPartitioning(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setAnyPartitioning(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setHashPartitioned(keysLeft); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part); + + assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + // test incompatible custom partitionings + { + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setAnyPartitioning(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setAnyPartitioning(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setCustomPartitioned(keysLeft, part); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part2); + + assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java new file mode 100644 index 0000000..e7807c9 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase { + + @Test + public void testRejectCoGroupOnHashAndRangePartitioning() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); + + Configuration cfg = new Configuration(); + cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH); + cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE); + + input.coGroup(input).where(0).equalTo(0) + .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>()) + .withParameters(cfg) + .print(); + + Plan p = env.createProgramPlan(); + try { + compileNoStats(p); + fail("This should fail with an exception"); + } + catch (CompilerException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java new file mode 100644 index 0000000..839f0a1 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.junit.Test; + +@SuppressWarnings("serial") +public class JoinGlobalPropertiesCompatibilityTest { + + @Test + public void checkCompatiblePartitionings() { + try { + final FieldList keysLeft = new FieldList(1, 4); + final FieldList keysRight = new FieldList(3, 1); + + SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight); + + // test compatible hash partitioning + { + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setHashPartitioned(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setHashPartitioned(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setHashPartitioned(keysLeft); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setHashPartitioned(keysRight); + + assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + // test compatible custom partitioning + { + Partitioner<Object> part = new Partitioner<Object>() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setCustomPartitioned(keysLeft, part); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setCustomPartitioned(keysRight, part); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setCustomPartitioned(keysLeft, part); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part); + + assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + // test custom partitioning matching any partitioning + { + Partitioner<Object> part = new Partitioner<Object>() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setAnyPartitioning(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setAnyPartitioning(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setCustomPartitioned(keysLeft, part); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part); + + assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void checkInompatiblePartitionings() { + try { + final FieldList keysLeft = new FieldList(1); + final FieldList keysRight = new FieldList(3); + + final Partitioner<Object> part = new Partitioner<Object>() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + final Partitioner<Object> part2 = new Partitioner<Object>() { + @Override + public int partition(Object key, int numPartitions) { + return 0; + } + }; + + SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight); + + // test incompatible hash with custom partitioning + { + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setAnyPartitioning(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setAnyPartitioning(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setHashPartitioned(keysLeft); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part); + + assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + + // test incompatible custom partitionings + { + RequestedGlobalProperties reqLeft = new RequestedGlobalProperties(); + reqLeft.setAnyPartitioning(keysLeft); + RequestedGlobalProperties reqRight = new RequestedGlobalProperties(); + reqRight.setAnyPartitioning(keysRight); + + GlobalProperties propsLeft = new GlobalProperties(); + propsLeft.setCustomPartitioned(keysLeft, part); + GlobalProperties propsRight = new GlobalProperties(); + propsRight.setCustomPartitioned(keysRight, part2); + + assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight)); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java new file mode 100644 index 0000000..9171cc7 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.CompilerTestBase; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +@SuppressWarnings({"serial", "unchecked"}) +public class JoinOnConflictingPartitioningsTest extends CompilerTestBase { + + @Test + public void testRejectJoinOnHashAndRangePartitioning() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); + + Configuration cfg = new Configuration(); + cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH); + cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE); + + input.join(input).where(0).equalTo(0) + .withParameters(cfg) + .print(); + + Plan p = env.createProgramPlan(); + try { + compileNoStats(p); + fail("This should fail with an exception"); + } + catch (CompilerException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java new file mode 100644 index 0000000..2c1574b --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plan/ChannelTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import org.apache.flink.api.common.operators.GenericDataSourceBase; +import org.apache.flink.api.common.operators.OperatorInformation; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.optimizer.dag.DataSourceNode; +import org.apache.flink.core.fs.Path; +import org.junit.Assert; +import org.junit.Test; +import org.apache.flink.api.java.io.TextInputFormat; + +public class ChannelTest { + + @Test + public void testGetEstimatesNoReplicationFactor() { + final long NUM_RECORD = 1001; + final long SIZE = 467131; + + DataSourceNode source = getSourceNode(); + SourcePlanNode planNode = new SourcePlanNode(source, "test node"); + Channel channel = new Channel(planNode); + + // no estimates here + Assert.assertEquals(-1, channel.getEstimatedOutputSize()); + Assert.assertEquals(-1, channel.getEstimatedNumRecords()); + + // set estimates + source.setEstimatedNumRecords(NUM_RECORD); + source.setEstimatedOutputSize(SIZE); + Assert.assertEquals(SIZE, channel.getEstimatedOutputSize()); + Assert.assertEquals(NUM_RECORD, channel.getEstimatedNumRecords()); + } + + @Test + public void testGetEstimatesWithReplicationFactor() { + final long NUM_RECORD = 1001; + final long SIZE = 467131; + + final int REPLICATION = 23; + + DataSourceNode source = getSourceNode(); + SourcePlanNode planNode = new SourcePlanNode(source, "test node"); + Channel channel = new Channel(planNode); + channel.setReplicationFactor(REPLICATION); + + // no estimates here + Assert.assertEquals(-1, channel.getEstimatedOutputSize()); + Assert.assertEquals(-1, channel.getEstimatedNumRecords()); + + // set estimates + source.setEstimatedNumRecords(NUM_RECORD); + source.setEstimatedOutputSize(SIZE); + Assert.assertEquals(SIZE * REPLICATION, channel.getEstimatedOutputSize()); + Assert.assertEquals(NUM_RECORD * REPLICATION, channel.getEstimatedNumRecords()); + } + + +// private static final OptimizerNode getSingleInputNode() { +// return new MapNode(new MapOperatorBase<String, String, GenericMap<String,String>>( +// new IdentityMapper<String>(), +// new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), +// "map")); +// } + + private static final DataSourceNode getSourceNode() { + return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>( + new TextInputFormat(new Path("/ignored")), + new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO), + "source")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java new file mode 100644 index 0000000..366d10d --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plandump/NumberFormattingTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plandump; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class NumberFormattingTest { + + @Test + public void testFormatNumberNoDigit() { + assertEquals("0.0", PlanJSONDumpGenerator.formatNumber(0)); + assertEquals("0.00", PlanJSONDumpGenerator.formatNumber(0.0000000001)); + assertEquals("-1.0", PlanJSONDumpGenerator.formatNumber(-1.0)); + assertEquals("1.00", PlanJSONDumpGenerator.formatNumber(1)); + assertEquals("17.00", PlanJSONDumpGenerator.formatNumber(17)); + assertEquals("17.44", PlanJSONDumpGenerator.formatNumber(17.44)); + assertEquals("143.00", PlanJSONDumpGenerator.formatNumber(143)); + assertEquals("143.40", PlanJSONDumpGenerator.formatNumber(143.4)); + assertEquals("143.50", PlanJSONDumpGenerator.formatNumber(143.5)); + assertEquals("143.60", PlanJSONDumpGenerator.formatNumber(143.6)); + assertEquals("143.45", PlanJSONDumpGenerator.formatNumber(143.45)); + assertEquals("143.55", PlanJSONDumpGenerator.formatNumber(143.55)); + assertEquals("143.65", PlanJSONDumpGenerator.formatNumber(143.65)); + assertEquals("143.66", PlanJSONDumpGenerator.formatNumber(143.655)); + + assertEquals("1.13 K", PlanJSONDumpGenerator.formatNumber(1126.0)); + assertEquals("11.13 K", PlanJSONDumpGenerator.formatNumber(11126.0)); + assertEquals("118.13 K", PlanJSONDumpGenerator.formatNumber(118126.0)); + + assertEquals("1.44 M", PlanJSONDumpGenerator.formatNumber(1435126.0)); + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java new file mode 100644 index 0000000..7fea8a6 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyCoGroupFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.RichCoGroupFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; + +public class DummyCoGroupFunction<L, R> extends RichCoGroupFunction<L, R, Tuple2<L, R>> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable<L> first, Iterable<R> second, Collector<Tuple2<L, R>> out) {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java new file mode 100644 index 0000000..6be8a24 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyFlatJoinFunction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.RichFlatJoinFunction; +import org.apache.flink.util.Collector; + +public class DummyFlatJoinFunction<T> extends RichFlatJoinFunction<T, T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public void join(T first, T second, Collector<T> out) { + out.collect(null); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java new file mode 100644 index 0000000..44d3695 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/DummyReducer.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.RichReduceFunction; + +public class DummyReducer<T> extends RichReduceFunction<T> { + + private static final long serialVersionUID = 1L; + + @Override + public T reduce(T a, T b) { + return a; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java new file mode 100644 index 0000000..0316463 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFlatMapper.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; + +public class IdentityFlatMapper<T> implements FlatMapFunction<T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(T value, Collector<T> out) {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java new file mode 100644 index 0000000..11fd044 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.testfunctions; + + +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; +import org.apache.flink.util.Collector; + + +@Combinable +public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public void reduce(Iterable<T> values, Collector<T> out) { + for (T next : values) { + out.collect(next); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java new file mode 100644 index 0000000..f335846 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityKeyExtractor.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.java.functions.KeySelector; + +public class IdentityKeyExtractor<T> implements KeySelector<T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public T getKey(T value) { + return value; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java new file mode 100644 index 0000000..025b4d8 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityMapper.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.RichMapFunction; + +public class IdentityMapper<T> extends RichMapFunction<T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public T map(T value) { + return value; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java new file mode 100644 index 0000000..6efbef1 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityPartitionerMapper.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.util.Collector; + +public class IdentityPartitionerMapper<T> extends RichMapPartitionFunction<T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public void mapPartition(Iterable<T> values, Collector<T> out) { + for (T in : values) { + out.collect(in); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java new file mode 100644 index 0000000..39c0e1b --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.RichReduceFunction; + +public class SelectOneReducer<T> extends RichReduceFunction<T> { + + private static final long serialVersionUID = 1L; + + @Override + public T reduce(T value1, T value2) throws Exception { + return value1; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java new file mode 100644 index 0000000..48d13ca --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.testfunctions; + +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; +import org.apache.flink.util.Collector; + + +@Combinable +public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public void reduce(Iterable<T> values, Collector<T> out) { + out.collect(values.iterator().next()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java new file mode 100644 index 0000000..6a84c44 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.util; + +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.flink.api.java.record.functions.CoGroupFunction; +import org.apache.flink.types.Record; +import org.apache.flink.util.Collector; + +@SuppressWarnings("deprecation") +public class DummyCoGroupStub extends CoGroupFunction implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) { + while (records1.hasNext()) { + out.collect(records1.next()); + } + + while (records2.hasNext()) { + out.collect(records2.next()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java new file mode 100644 index 0000000..8ee2285 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.util; + +import org.apache.flink.api.java.record.functions.CrossFunction; +import org.apache.flink.types.Record; + +@SuppressWarnings("deprecation") +public class DummyCrossStub extends CrossFunction { + private static final long serialVersionUID = 1L; + + @Override + public Record cross(Record first, Record second) throws Exception { + return first; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java new file mode 100644 index 0000000..0c816e7 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.util; + +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.record.io.DelimitedInputFormat; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.Record; + +public final class DummyInputFormat extends DelimitedInputFormat { + private static final long serialVersionUID = 1L; + + private final IntValue integer = new IntValue(1); + + @Override + public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) { + target.setField(0, this.integer); + target.setField(1, this.integer); + return target; + } + + @Override + public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) { + return (cachedStatistics instanceof FileBaseStatistics) ? (FileBaseStatistics) cachedStatistics : null; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java new file mode 100644 index 0000000..d00be6e --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.util; + +import java.io.Serializable; + +import org.apache.flink.api.java.record.functions.JoinFunction; +import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept; +import org.apache.flink.types.Record; +import org.apache.flink.util.Collector; + +@SuppressWarnings("deprecation") +@ConstantFieldsFirstExcept({}) +public class DummyMatchStub extends JoinFunction implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void join(Record value1, Record value2, Collector<Record> out) throws Exception { + out.collect(value1); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java new file mode 100644 index 0000000..444b48e --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.util; + +import java.io.Serializable; + +import org.apache.flink.api.java.record.functions.JoinFunction; +import org.apache.flink.types.Record; +import org.apache.flink.util.Collector; + +@SuppressWarnings("deprecation") +public class DummyNonPreservingMatchStub extends JoinFunction implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void join(Record value1, Record value2, Collector<Record> out) throws Exception { + out.collect(value1); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java new file mode 100644 index 0000000..1bbe24c --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.util; + + +import org.apache.flink.api.java.record.io.DelimitedOutputFormat; +import org.apache.flink.types.Record; + + +public final class DummyOutputFormat extends DelimitedOutputFormat { + private static final long serialVersionUID = 1L; + + @Override + public int serializeRecord(Record rec, byte[] target) throws Exception { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java new file mode 100644 index 0000000..cccc6cb --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.util; + +import java.io.Serializable; + +import org.apache.flink.api.java.record.functions.MapFunction; +import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept; +import org.apache.flink.types.Record; +import org.apache.flink.util.Collector; + +@SuppressWarnings("deprecation") +@ConstantFieldsExcept({}) +public final class IdentityMap extends MapFunction implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void map(Record record, Collector<Record> out) throws Exception { + out.collect(record); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java new file mode 100644 index 0000000..f45745d --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.util; + +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.flink.api.java.record.functions.ReduceFunction; +import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept; +import org.apache.flink.types.Record; +import org.apache.flink.util.Collector; + +@SuppressWarnings("deprecation") +@ConstantFieldsExcept({}) +public final class IdentityReduce extends ReduceFunction implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { + while (records.hasNext()) { + out.collect(records.next()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/resources/log4j-test.properties b/flink-optimizer/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..2fb9345 --- /dev/null +++ b/flink-optimizer/src/test/resources/log4j-test.properties @@ -0,0 +1,19 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/resources/log4j.properties b/flink-optimizer/src/test/resources/log4j.properties new file mode 100644 index 0000000..fa3f937 --- /dev/null +++ b/flink-optimizer/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This file ensures that tests executed from the IDE show log output + +log4j.rootLogger=INFO, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target = System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/resources/logback-test.xml b/flink-optimizer/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8b3bb27 --- /dev/null +++ b/flink-optimizer/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml ---------------------------------------------------------------------- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index a59bde4..bf44058 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -109,7 +109,7 @@ under the License. <exclude>org.apache.flink:flink-java</exclude> <exclude>org.apache.flink:flink-scala</exclude> <exclude>org.apache.flink:flink-runtime</exclude> - <exclude>org.apache.flink:flink-compiler</exclude> + <exclude>org.apache.flink:flink-optimizer</exclude> <exclude>org.apache.flink:flink-spargel</exclude> <exclude>org.apache.flink:flink-avro</exclude> <exclude>org.apache.flink:flink-java-examples</exclude> http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml ---------------------------------------------------------------------- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml index 43eec6d..1c89170 100644 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml @@ -113,7 +113,7 @@ under the License. <exclude>org.apache.flink:flink-java</exclude> <exclude>org.apache.flink:flink-scala</exclude> <exclude>org.apache.flink:flink-runtime</exclude> - <exclude>org.apache.flink:flink-compiler</exclude> + <exclude>org.apache.flink:flink-optimizer</exclude> <exclude>org.apache.flink:flink-spargel</exclude> <exclude>org.apache.flink:flink-avro</exclude> <exclude>org.apache.flink:flink-java-examples</exclude> http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml index 523237e..f350062 100644 --- a/flink-scala/pom.xml +++ b/flink-scala/pom.xml @@ -47,7 +47,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-compiler</artifactId> + <artifactId>flink-optimizer</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-staging/flink-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/pom.xml b/flink-staging/flink-streaming/pom.xml index 43b8181..e7c23db 100644 --- a/flink-staging/flink-streaming/pom.xml +++ b/flink-staging/flink-streaming/pom.xml @@ -49,12 +49,6 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-compiler</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> <artifactId>flink-runtime</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-test-utils/pom.xml ---------------------------------------------------------------------- diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml index e8da4e9..467fb44 100644 --- a/flink-test-utils/pom.xml +++ b/flink-test-utils/pom.xml @@ -42,7 +42,7 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-compiler</artifactId> + <artifactId>flink-optimizer</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 7958b14..1670d85 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -46,7 +46,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-compiler</artifactId> + <artifactId>flink-optimizer</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e771ad4..11cca1b 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ under the License. <module>flink-java</module> <module>flink-scala</module> <module>flink-runtime</module> - <module>flink-compiler</module> + <module>flink-optimizer</module> <module>flink-examples</module> <module>flink-clients</module> <module>flink-tests</module>