http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java deleted file mode 100644 index c2e81a8..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.traversals; - -import org.apache.flink.optimizer.costs.CostEstimator; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.util.Visitor; - -/** - * Visitor that computes the interesting properties for each node in the optimizer DAG. On its recursive - * depth-first descend, it propagates all interesting properties top-down. - */ -public class InterestingPropertyVisitor implements Visitor<OptimizerNode> { - - private CostEstimator estimator; // the cost estimator for maximal costs of an interesting property - - /** - * Creates a new visitor that computes the interesting properties for all nodes in the plan. - * It uses the given cost estimator used to compute the maximal costs for an interesting property. - * - * @param estimator - * The cost estimator to estimate the maximal costs for interesting properties. - */ - public InterestingPropertyVisitor(CostEstimator estimator) { - this.estimator = estimator; - } - - @Override - public boolean preVisit(OptimizerNode node) { - // The interesting properties must be computed on the descend. In case a node has multiple outputs, - // that computation must happen during the last descend. - - if (node.getInterestingProperties() == null && node.haveAllOutputConnectionInterestingProperties()) { - node.computeUnionOfInterestingPropertiesFromSuccessors(); - node.computeInterestingPropertiesForInputs(this.estimator); - return true; - } else { - return false; - } - } - - @Override - public void postVisit(OptimizerNode visitable) {} -}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java deleted file mode 100644 index 58aa3c1..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.traversals; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.dag.TempMode; -import org.apache.flink.optimizer.plan.BinaryUnionPlanNode; -import org.apache.flink.optimizer.plan.BulkIterationPlanNode; -import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.IterationPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plan.SolutionSetPlanNode; -import org.apache.flink.optimizer.plan.SourcePlanNode; -import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; -import org.apache.flink.optimizer.plan.WorksetPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.util.Visitor; - -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * This visitor traverses the selected execution plan and finalizes it: - * - * <ul> - * <li>The graph of nodes is double-linked (links from child to parent are inserted).</li> - * <li>If unions join static and dynamic paths, the cache is marked as a memory consumer.</li> - * <li>Relative memory fractions are assigned to all nodes.</li> - * <li>All nodes are collected into a set.</li> - * </ul> - */ -public class PlanFinalizer implements Visitor<PlanNode> { - - private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan - - private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan - - private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan - - private final Deque<IterationPlanNode> stackOfIterationNodes; - - private int memoryConsumerWeights; // a counter of all memory consumers - - /** - * Creates a new plan finalizer. - */ - public PlanFinalizer() { - this.allNodes = new HashSet<PlanNode>(); - this.sources = new ArrayList<SourcePlanNode>(); - this.sinks = new ArrayList<SinkPlanNode>(); - this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>(); - } - - public OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) { - this.memoryConsumerWeights = 0; - - // traverse the graph - for (SinkPlanNode node : sinks) { - node.accept(this); - } - - // assign the memory to each node - if (this.memoryConsumerWeights > 0) { - for (PlanNode node : this.allNodes) { - // assign memory to the driver strategy of the node - final int consumerWeight = node.getMemoryConsumerWeight(); - if (consumerWeight > 0) { - final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights; - node.setRelativeMemoryPerSubtask(relativeMem); - if (Optimizer.LOG.isDebugEnabled()) { - Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " + - node.getProgramOperator().getName() + "."); - } - } - - // assign memory to the local and global strategies of the channels - for (Channel c : node.getInputs()) { - if (c.getLocalStrategy().dams()) { - final double relativeMem = 1.0 / this.memoryConsumerWeights; - c.setRelativeMemoryLocalStrategy(relativeMem); - if (Optimizer.LOG.isDebugEnabled()) { - Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " + - "instance of " + c + "."); - } - } - if (c.getTempMode() != TempMode.NONE) { - final double relativeMem = 1.0/ this.memoryConsumerWeights; - c.setRelativeTempMemory(relativeMem); - if (Optimizer.LOG.isDebugEnabled()) { - Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " + - "table for " + c + "."); - } - } - } - } - } - return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan); - } - - @Override - public boolean preVisit(PlanNode visitable) { - // if we come here again, prevent a further descend - if (!this.allNodes.add(visitable)) { - return false; - } - - if (visitable instanceof SinkPlanNode) { - this.sinks.add((SinkPlanNode) visitable); - } - else if (visitable instanceof SourcePlanNode) { - this.sources.add((SourcePlanNode) visitable); - } - else if (visitable instanceof BinaryUnionPlanNode) { - BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable; - if (unionNode.unionsStaticAndDynamicPath()) { - unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED); - } - } - else if (visitable instanceof BulkPartialSolutionPlanNode) { - // tell the partial solution about the iteration node that contains it - final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable; - final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast(); - - // sanity check! - if (iteration == null || !(iteration instanceof BulkIterationPlanNode)) { - throw new CompilerException("Bug: Error finalizing the plan. " + - "Cannot associate the node for a partial solutions with its containing iteration."); - } - pspn.setContainingIterationNode((BulkIterationPlanNode) iteration); - } - else if (visitable instanceof WorksetPlanNode) { - // tell the partial solution about the iteration node that contains it - final WorksetPlanNode wspn = (WorksetPlanNode) visitable; - final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast(); - - // sanity check! - if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) { - throw new CompilerException("Bug: Error finalizing the plan. " + - "Cannot associate the node for a partial solutions with its containing iteration."); - } - wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration); - } - else if (visitable instanceof SolutionSetPlanNode) { - // tell the partial solution about the iteration node that contains it - final SolutionSetPlanNode sspn = (SolutionSetPlanNode) visitable; - final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast(); - - // sanity check! - if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) { - throw new CompilerException("Bug: Error finalizing the plan. " + - "Cannot associate the node for a partial solutions with its containing iteration."); - } - sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration); - } - - // double-connect the connections. previously, only parents knew their children, because - // one child candidate could have been referenced by multiple parents. - for (Channel conn : visitable.getInputs()) { - conn.setTarget(visitable); - conn.getSource().addOutgoingChannel(conn); - } - - for (Channel c : visitable.getBroadcastInputs()) { - c.setTarget(visitable); - c.getSource().addOutgoingChannel(c); - } - - // count the memory consumption - this.memoryConsumerWeights += visitable.getMemoryConsumerWeight(); - for (Channel c : visitable.getInputs()) { - if (c.getLocalStrategy().dams()) { - this.memoryConsumerWeights++; - } - if (c.getTempMode() != TempMode.NONE) { - this.memoryConsumerWeights++; - } - } - for (Channel c : visitable.getBroadcastInputs()) { - if (c.getLocalStrategy().dams()) { - this.memoryConsumerWeights++; - } - if (c.getTempMode() != TempMode.NONE) { - this.memoryConsumerWeights++; - } - } - - // pass the visitor to the iteraton's step function - if (visitable instanceof IterationPlanNode) { - // push the iteration node onto the stack - final IterationPlanNode iterNode = (IterationPlanNode) visitable; - this.stackOfIterationNodes.addLast(iterNode); - - // recurse - ((IterationPlanNode) visitable).acceptForStepFunction(this); - - // pop the iteration node from the stack - this.stackOfIterationNodes.removeLast(); - } - return true; - } - - @Override - public void postVisit(PlanNode visitable) {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java deleted file mode 100644 index c0dc4dd..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.traversals; - -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.IterationNode; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.util.Visitor; - -import java.util.HashSet; -import java.util.Set; - -/** - * A traversal that goes over the program data flow of an iteration and makes the nodes - * that depend on the partial solution (the data set recomputed in each iteration) as "dynamic" - * and the other nodes as "static". - */ -public class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> { - - private final Set<OptimizerNode> seenBefore = new HashSet<OptimizerNode>(); - - private final int costWeight; - - public StaticDynamicPathIdentifier(int costWeight) { - this.costWeight = costWeight; - } - - @Override - public boolean preVisit(OptimizerNode visitable) { - return this.seenBefore.add(visitable); - } - - @Override - public void postVisit(OptimizerNode visitable) { - visitable.identifyDynamicPath(this.costWeight); - - // check that there is no nested iteration on the dynamic path - if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) { - throw new CompilerException("Nested iterations are currently not supported."); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java deleted file mode 100644 index d359490..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.traversals; - -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.operators.base.DeltaIterationBase; -import org.apache.flink.util.Visitor; - -import java.util.HashSet; -import java.util.Set; - -/** - * A traversal that checks if the Workset of a delta iteration is used in the data flow - * of its step function. - */ -public class StepFunctionValidator implements Visitor<Operator<?>> { - - private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>(); - - private boolean foundWorkset; - - @Override - public boolean preVisit(Operator<?> visitable) { - if (visitable instanceof DeltaIterationBase.WorksetPlaceHolder) { - foundWorkset = true; - } - - return (!foundWorkset) && seenBefore.add(visitable); - } - - @Override - public void postVisit(Operator<?> visitable) {} - - public boolean hasFoundWorkset() { - return foundWorkset; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java deleted file mode 100644 index cd8766c..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains the various traversals over the program plan and the - * optimizer DAG (directed acyclic graph) that are made in the course of - * the optimization. - * - * The traversals are mostly implemented as a {@link org.apache.flink.util.Visitor} that - * traversed the program flow. - */ -package org.apache.flink.optimizer.traversals; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java deleted file mode 100644 index 5110849..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import java.util.List; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.functions.util.NoOpFunction; -import org.apache.flink.api.common.operators.BinaryOperatorInformation; -import org.apache.flink.api.common.operators.DualInputOperator; -import org.apache.flink.api.common.operators.RecordOperator; -import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.types.Key; - - -public class NoOpBinaryUdfOp<OUT> extends DualInputOperator<OUT, OUT, OUT, NoOpFunction> implements RecordOperator { - - public NoOpBinaryUdfOp(TypeInformation<OUT> type) { - super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), new BinaryOperatorInformation<OUT, OUT, OUT>(type, type, type), "NoContract"); - } - - @SuppressWarnings("unchecked") - @Override - public Class<? extends Key<?>>[] getKeyClasses() { - return (Class<? extends Key<?>>[]) new Class[0]; - } - - @Override - protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) { - throw new UnsupportedOperationException(); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java deleted file mode 100644 index cc4a4d6..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.util; - -import java.util.List; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.functions.util.NoOpFunction; -import org.apache.flink.api.common.operators.RecordOperator; -import org.apache.flink.api.common.operators.SingleInputOperator; -import org.apache.flink.api.common.operators.UnaryOperatorInformation; -import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.types.Key; - - -public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunction> implements RecordOperator { - - @SuppressWarnings("rawtypes") - public static final NoOpUnaryUdfOp INSTANCE = new NoOpUnaryUdfOp(); - - private NoOpUnaryUdfOp() { - // pass null here because we override getOutputType to return type - // of input operator - super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), null, ""); - } - - @SuppressWarnings("unchecked") - @Override - public Class<? extends Key<?>>[] getKeyClasses() { - return (Class<? extends Key<?>>[]) new Class[0]; - } - - @Override - public UnaryOperatorInformation<OUT, OUT> getOperatorInfo() { - TypeInformation<OUT> previousOut = input.getOperatorInfo().getOutputType(); - return new UnaryOperatorInformation<OUT, OUT>(previousOut, previousOut); - } - - @Override - protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) { - return inputData; - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java deleted file mode 100644 index d8f33a2..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.util; - -import java.util.Arrays; - -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.optimizer.CompilerException; - - -/** - * - */ -public class Utils -{ - public static final FieldList createOrderedFromSet(FieldSet set) { - if (set instanceof FieldList) { - return (FieldList) set; - } else { - final int[] cols = set.toArray(); - Arrays.sort(cols); - return new FieldList(cols); - } - } - - public static final Ordering createOrdering(FieldList fields, boolean[] directions) { - final Ordering o = new Ordering(); - for (int i = 0; i < fields.size(); i++) { - o.appendOrdering(fields.get(i), null, directions == null || directions[i] ? Order.ASCENDING : Order.DESCENDING); - } - return o; - } - - public static final Ordering createOrdering(FieldList fields) { - final Ordering o = new Ordering(); - for (int i = 0; i < fields.size(); i++) { - o.appendOrdering(fields.get(i), null, Order.ANY); - } - return o; - } - - public static boolean[] getDirections(Ordering o, int numFields) { - final boolean[] dirs = o.getFieldSortDirections(); - if (dirs.length == numFields) { - return dirs; - } else if (dirs.length > numFields) { - final boolean[] subSet = new boolean[numFields]; - System.arraycopy(dirs, 0, subSet, 0, numFields); - return subSet; - } else { - throw new CompilerException(); - } - } - - // -------------------------------------------------------------------------------------------- - - /** - * No instantiation. - */ - private Utils() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java deleted file mode 100644 index 1e4bafb..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.operators.CrossOperator; -import org.apache.flink.api.java.record.operators.CrossWithLargeOperator; -import org.apache.flink.api.java.record.operators.CrossWithSmallOperator; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.util.DummyCrossStub; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.junit.Test; - -/** -* Tests that validate optimizer choices when using operators that are requesting certain specific execution -* strategies. -*/ -@SuppressWarnings({"serial", "deprecation"}) -public class AdditionalOperatorsTest extends CompilerTestBase { - - @Test - public void testCrossWithSmall() { - // construct the plan - FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1"); - FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2"); - - CrossOperator cross = CrossWithSmallOperator.builder(new DummyCrossStub()) - .input1(source1).input2(source2) - .name("Cross").build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink"); - - Plan plan = new Plan(sink); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - - - try { - OptimizedPlan oPlan = compileNoStats(plan); - OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan); - - DualInputPlanNode crossPlanNode = resolver.getNode("Cross"); - Channel in1 = crossPlanNode.getInput1(); - Channel in2 = crossPlanNode.getInput2(); - - assertEquals(ShipStrategyType.FORWARD, in1.getShipStrategy()); - assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy()); - } catch(CompilerException ce) { - ce.printStackTrace(); - fail("The pact compiler is unable to compile this plan correctly."); - } - } - - @Test - public void testCrossWithLarge() { - // construct the plan - FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1"); - FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2"); - - CrossOperator cross= CrossWithLargeOperator.builder(new DummyCrossStub()) - .input1(source1).input2(source2) - .name("Cross").build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink"); - - Plan plan = new Plan(sink); - plan.setDefaultParallelism(DEFAULT_PARALLELISM); - - - try { - OptimizedPlan oPlan = compileNoStats(plan); - OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan); - - DualInputPlanNode crossPlanNode = resolver.getNode("Cross"); - Channel in1 = crossPlanNode.getInput1(); - Channel in2 = crossPlanNode.getInput2(); - - assertEquals(ShipStrategyType.BROADCAST, in1.getShipStrategy()); - assertEquals(ShipStrategyType.FORWARD, in2.getShipStrategy()); - } catch(CompilerException ce) { - ce.printStackTrace(); - fail("The pact compiler is unable to compile this plan correctly."); - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java deleted file mode 100644 index 916aa27..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java +++ /dev/null @@ -1,1039 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer; - -import static org.junit.Assert.*; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.flink.api.common.functions.CoGroupFunction; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; -import org.apache.flink.optimizer.testfunctions.SelectOneReducer; -import org.apache.flink.util.Collector; -import org.junit.Assert; -import org.junit.Test; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.IterativeDataSet; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichJoinFunction; -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; -import org.apache.flink.api.java.record.operators.BulkIteration; -import org.apache.flink.api.java.record.operators.CoGroupOperator; -import org.apache.flink.api.java.record.operators.CrossOperator; -import org.apache.flink.api.java.record.operators.DeltaIteration; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; -import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; -import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor; -import org.apache.flink.optimizer.testfunctions.IdentityMapper; -import org.apache.flink.optimizer.testfunctions.Top1GroupReducer; -import org.apache.flink.optimizer.util.DummyCoGroupStub; -import org.apache.flink.optimizer.util.DummyCrossStub; -import org.apache.flink.optimizer.util.DummyInputFormat; -import org.apache.flink.optimizer.util.DummyMatchStub; -import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub; -import org.apache.flink.optimizer.util.DummyOutputFormat; -import org.apache.flink.optimizer.util.IdentityMap; -import org.apache.flink.optimizer.util.IdentityReduce; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; - -@SuppressWarnings({"serial", "deprecation"}) -public class BranchingPlansCompilerTest extends CompilerTestBase { - - - @Test - public void testCostComputationWithMultipleDataSinks() { - final int SINKS = 5; - - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); - - DataSet<Long> source = env.generateSequence(1, 10000); - - DataSet<Long> mappedA = source.map(new IdentityMapper<Long>()); - DataSet<Long> mappedC = source.map(new IdentityMapper<Long>()); - - for (int sink = 0; sink < SINKS; sink++) { - mappedA.output(new DiscardingOutputFormat<Long>()); - mappedC.output(new DiscardingOutputFormat<Long>()); - } - - Plan plan = env.createProgramPlan("Plans With Multiple Data Sinks"); - OptimizedPlan oPlan = compileNoStats(plan); - - new JobGraphGenerator().compileJobGraph(oPlan); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - - /** - * - * <pre> - * (SRC A) - * | - * (MAP A) - * / \ - * (MAP B) (MAP C) - * / / \ - * (SINK A) (SINK B) (SINK C) - * </pre> - */ - @SuppressWarnings("unchecked") - @Test - public void testBranchingWithMultipleDataSinks2() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); - - DataSet<Long> source = env.generateSequence(1, 10000); - - DataSet<Long> mappedA = source.map(new IdentityMapper<Long>()); - DataSet<Long> mappedB = mappedA.map(new IdentityMapper<Long>()); - DataSet<Long> mappedC = mappedA.map(new IdentityMapper<Long>()); - - mappedB.output(new DiscardingOutputFormat<Long>()); - mappedC.output(new DiscardingOutputFormat<Long>()); - mappedC.output(new DiscardingOutputFormat<Long>()); - - Plan plan = env.createProgramPlan(); - Set<Operator<?>> sinks = new HashSet<Operator<?>>(plan.getDataSinks()); - - OptimizedPlan oPlan = compileNoStats(plan); - - // ---------- check the optimizer plan ---------- - - // number of sinks - assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size()); - - // remove matching sinks to check relation - for (SinkPlanNode sink : oPlan.getDataSinks()) { - assertTrue(sinks.remove(sink.getProgramOperator())); - } - assertTrue(sinks.isEmpty()); - - new JobGraphGenerator().compileJobGraph(oPlan); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - - /** - * <pre> - * SINK - * | - * COGROUP - * +---/ \----+ - * / \ - * / MATCH10 - * / | \ - * / | MATCH9 - * MATCH5 | | \ - * | \ | | MATCH8 - * | MATCH4 | | | \ - * | | \ | | | MATCH7 - * | | MATCH3 | | | | \ - * | | | \ | | | | MATCH6 - * | | | MATCH2 | | | | | | - * | | | | \ +--+--+--+--+--+ - * | | | | MATCH1 MAP - * \ | | | | | /-----------/ - * (DATA SOURCE ONE) - * </pre> - */ - @Test - public void testBranchingSourceMultipleTimes() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); - - DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000) - .map(new Duplicator<Long>()); - - DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> mapped = source.map( - new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> map(Tuple2<Long, Long> value) { - return null; - } - }); - - DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - - joined5.coGroup(joined10) - .where(1).equalTo(1) - .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>()) - - .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()); - - Plan plan = env.createProgramPlan(); - OptimizedPlan oPlan = compileNoStats(plan); - new JobGraphGenerator().compileJobGraph(oPlan); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - /** - * - * <pre> - - * (SINK A) - * | (SINK B) (SINK C) - * CROSS / / - * / \ | +------+ - * / \ | / - * REDUCE MATCH2 - * | +---/ \ - * \ / | - * MAP | - * | | - * COGROUP MATCH1 - * / \ / \ - * (SRC A) (SRC B) (SRC C) - * </pre> - */ - @Test - public void testBranchingWithMultipleDataSinks() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); - - DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000) - .map(new Duplicator<Long>()); - - DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000) - .map(new Duplicator<Long>()); - - DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000) - .map(new Duplicator<Long>()); - - DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB) - .where(0).equalTo(1) - .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() { - @Override - public void coGroup(Iterable<Tuple2<Long, Long>> first, - Iterable<Tuple2<Long, Long>> second, - Collector<Tuple2<Long, Long>> out) { - } - }) - .map(new IdentityMapper<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC) - .where(0).equalTo(1) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined) - .where(1).equalTo(1) - .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>()); - - DataSet<Tuple2<Long, Long>> reduced = mapped - .groupBy(1) - .reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>()); - - reduced.cross(joined2) - .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()); - - joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); - joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()); - - Plan plan = env.createProgramPlan(); - OptimizedPlan oPlan = compileNoStats(plan); - new JobGraphGenerator().compileJobGraph(oPlan); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @SuppressWarnings("unchecked") - @Test - public void testBranchEachContractType() { - try { - // construct the plan - FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), "file:///test/file1", "Source A"); - FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), "file:///test/file2", "Source B"); - FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), "file:///test/file3", "Source C"); - - MapOperator map1 = MapOperator.builder(new IdentityMap()).input(sourceA).name("Map 1").build(); - - ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(map1) - .name("Reduce 1") - .build(); - - JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(sourceB, sourceB, sourceC) - .input2(sourceC) - .name("Match 1") - .build(); - ; - CoGroupOperator cogroup1 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(sourceA) - .input2(sourceB) - .name("CoGroup 1") - .build(); - - CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub()) - .input1(reduce1) - .input2(cogroup1) - .name("Cross 1") - .build(); - - - CoGroupOperator cogroup2 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(cross1) - .input2(cross1) - .name("CoGroup 2") - .build(); - - CoGroupOperator cogroup3 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(map1) - .input2(match1) - .name("CoGroup 3") - .build(); - - - MapOperator map2 = MapOperator.builder(new IdentityMap()).input(cogroup3).name("Map 2").build(); - - CoGroupOperator cogroup4 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(map2) - .input2(match1) - .name("CoGroup 4") - .build(); - - CoGroupOperator cogroup5 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(cogroup2) - .input2(cogroup1) - .name("CoGroup 5") - .build(); - - CoGroupOperator cogroup6 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(reduce1) - .input2(cogroup4) - .name("CoGroup 6") - .build(); - - CoGroupOperator cogroup7 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0) - .input1(cogroup5) - .input2(cogroup6) - .name("CoGroup 7") - .build(); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7); - sink.addInput(sourceA); - sink.addInput(cogroup3); - sink.addInput(cogroup4); - sink.addInput(cogroup1); - - // return the PACT plan - Plan plan = new Plan(sink, "Branching of each contract type"); - - OptimizedPlan oPlan = compileNoStats(plan); - - JobGraphGenerator jobGen = new JobGraphGenerator(); - - //Compile plan to verify that no error is thrown - jobGen.compileJobGraph(oPlan); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - - @Test - public void testBranchingUnion() { - try { - // construct the plan - FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE); - FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE); - - JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(source1) - .input2(source2) - .name("Match 1") - .build(); - - MapOperator ma1 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map1").build(); - - ReduceOperator r1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(ma1) - .name("Reduce 1") - .build(); - - ReduceOperator r2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0) - .input(mat1) - .name("Reduce 2") - .build(); - - MapOperator ma2 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map 2").build(); - - MapOperator ma3 = MapOperator.builder(new IdentityMap()).input(ma2).name("Map 3").build(); - - @SuppressWarnings("unchecked") - JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0) - .input1(r1, r2, ma2, ma3) - .input2(ma2) - .name("Match 2") - .build(); - mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE); - - FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2); - - - // return the PACT plan - Plan plan = new Plan(sink, "Branching Union"); - - OptimizedPlan oPlan = compileNoStats(plan); - - JobGraphGenerator jobGen = new JobGraphGenerator(); - - //Compile plan to verify that no error is thrown - jobGen.compileJobGraph(oPlan); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - /** - * - * <pre> - * (SRC A) - * / \ - * (SINK A) (SINK B) - * </pre> - */ - @Test - public void testBranchingWithMultipleDataSinksSmall() { - try { - // construct the plan - final String out1Path = "file:///test/1"; - final String out2Path = "file:///test/2"; - - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE); - - FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA); - FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceA); - - List<FileDataSink> sinks = new ArrayList<FileDataSink>(); - sinks.add(sinkA); - sinks.add(sinkB); - - // return the PACT plan - Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks"); - - OptimizedPlan oPlan = compileNoStats(plan); - - // ---------- check the optimizer plan ---------- - - // number of sinks - Assert.assertEquals("Wrong number of data sinks.", 2, oPlan.getDataSinks().size()); - - // sinks contain all sink paths - Set<String> allSinks = new HashSet<String>(); - allSinks.add(out1Path); - allSinks.add(out2Path); - - for (SinkPlanNode n : oPlan.getDataSinks()) { - String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath(); - Assert.assertTrue("Invalid data sink.", allSinks.remove(path)); - } - - // ---------- compile plan to nephele job graph to verify that no error is thrown ---------- - - JobGraphGenerator jobGen = new JobGraphGenerator(); - jobGen.compileJobGraph(oPlan); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - /** - * - * <pre> - * (SINK 3) (SINK 1) (SINK 2) (SINK 4) - * \ / \ / - * (SRC A) (SRC B) - * </pre> - * - * NOTE: this case is currently not caught by the compiler. we should enable the test once it is caught. - */ - @Test - public void testBranchingDisjointPlan() { - // construct the plan - final String out1Path = "file:///test/1"; - final String out2Path = "file:///test/2"; - final String out3Path = "file:///test/3"; - final String out4Path = "file:///test/4"; - - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE); - FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE); - - FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA, "1"); - FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB, "2"); - FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, out3Path, sourceA, "3"); - FileDataSink sink4 = new FileDataSink(DummyOutputFormat.class, out4Path, sourceB, "4"); - - - List<FileDataSink> sinks = new ArrayList<FileDataSink>(); - sinks.add(sink1); - sinks.add(sink2); - sinks.add(sink3); - sinks.add(sink4); - - // return the PACT plan - Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches"); - compileNoStats(plan); - } - - @Test - public void testBranchAfterIteration() { - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2"); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(sourceA); - iteration.setMaximumNumberOfIterations(10); - - MapOperator mapper = MapOperator.builder(IdentityMap.class).name("Mapper").input(iteration.getPartialSolution()).build(); - iteration.setNextPartialSolution(mapper); - - FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 1"); - - MapOperator postMap = MapOperator.builder(IdentityMap.class).name("Post Iteration Mapper") - .input(iteration).build(); - - FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink 2"); - - List<FileDataSink> sinks = new ArrayList<FileDataSink>(); - sinks.add(sink1); - sinks.add(sink2); - - Plan plan = new Plan(sinks); - - try { - compileNoStats(plan); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - @Test - public void testBranchBeforeIteration() { - FileDataSource source1 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1"); - FileDataSource source2 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2"); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(source2); - iteration.setMaximumNumberOfIterations(10); - - MapOperator inMap = MapOperator.builder(new IdentityMap()) - .input(source1) - .name("In Iteration Map") - .setBroadcastVariable("BC", iteration.getPartialSolution()) - .build(); - - iteration.setNextPartialSolution(inMap); - - MapOperator postMap = MapOperator.builder(new IdentityMap()) - .input(source1) - .name("Post Iteration Map") - .setBroadcastVariable("BC", iteration) - .build(); - - FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink"); - - Plan plan = new Plan(sink); - - try { - compileNoStats(plan); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - /** - * Test to ensure that sourceA is inside as well as outside of the iteration the same - * node. - * - * <pre> - * (SRC A) (SRC B) - * / \ / \ - * (SINK 1) (ITERATION) | (SINK 2) - * / \ / - * (SINK 3) (CROSS => NEXT PARTIAL SOLUTION) - * </pre> - */ - @Test - public void testClosure() { - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1"); - FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2"); - - FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1"); - FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceB, "Sink 2"); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(sourceA); - iteration.setMaximumNumberOfIterations(10); - - CrossOperator stepFunction = CrossOperator.builder(DummyCrossStub.class).name("StepFunction"). - input1(iteration.getPartialSolution()). - input2(sourceB). - build(); - - iteration.setNextPartialSolution(stepFunction); - - FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3"); - - List<FileDataSink> sinks = new ArrayList<FileDataSink>(); - sinks.add(sink1); - sinks.add(sink2); - sinks.add(sink3); - - Plan plan = new Plan(sinks); - - try{ - compileNoStats(plan); - }catch(Exception e){ - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - /** - * <pre> - * (SRC A) (SRC B) (SRC C) - * / \ / / \ - * (SINK 1) (DELTA ITERATION) | (SINK 2) - * / | \ / - * (SINK 3) | (CROSS => NEXT WORKSET) - * | | - * (JOIN => SOLUTION SET DELTA) - * </pre> - */ - @Test - public void testClosureDeltaIteration() { - FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1"); - FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2"); - FileDataSource sourceC = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 3"); - - FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1"); - FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceC, "Sink 2"); - - DeltaIteration iteration = new DeltaIteration(0, "Loop"); - iteration.setInitialSolutionSet(sourceA); - iteration.setInitialWorkset(sourceB); - iteration.setMaximumNumberOfIterations(10); - - CrossOperator nextWorkset = CrossOperator.builder(DummyCrossStub.class).name("Next workset"). - input1(iteration.getWorkset()). - input2(sourceC). - build(); - - JoinOperator solutionSetDelta = JoinOperator.builder(DummyMatchStub.class, LongValue.class,0,0). - name("Next solution set."). - input1(nextWorkset). - input2(iteration.getSolutionSet()). - build(); - - iteration.setNextWorkset(nextWorkset); - iteration.setSolutionSetDelta(solutionSetDelta); - - FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3"); - - List<FileDataSink> sinks = new ArrayList<FileDataSink>(); - sinks.add(sink1); - sinks.add(sink2); - sinks.add(sink3); - - Plan plan = new Plan(sinks); - - try{ - compileNoStats(plan); - }catch(Exception e){ - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - /** - * <pre> - * +----Iteration-------+ - * | | - * /---------< >---------join-----< >---sink - * / (Solution)| / | - * / | / | - * /--map-------< >----\ / /--| - * / (Workset)| \ / / | - * src-map | join------/ | - * \ | / | - * \ +-----/--------------+ - * \ / - * \--reduce-------/ - * </pre> - */ - @Test - public void testDeltaIterationWithStaticInput() { - FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source"); - - MapOperator mappedSource = MapOperator.builder(IdentityMap.class). - input(source). - name("Identity mapped source"). - build(); - - ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class). - input(source). - name("Identity reduce source"). - build(); - - DeltaIteration iteration = new DeltaIteration(0,"Loop"); - iteration.setMaximumNumberOfIterations(10); - iteration.setInitialSolutionSet(source); - iteration.setInitialWorkset(mappedSource); - - JoinOperator nextWorkset = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,0). - input1(iteration.getWorkset()). - input2(reducedSource). - name("Next work set"). - build(); - - JoinOperator solutionSetDelta = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0, - 0). - input1(iteration.getSolutionSet()). - input2(nextWorkset). - name("Solution set delta"). - build(); - - iteration.setNextWorkset(nextWorkset); - iteration.setSolutionSetDelta(solutionSetDelta); - - FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink"); - List<FileDataSink> sinks = new ArrayList<FileDataSink>(); - sinks.add(sink); - - Plan plan = new Plan(sinks); - - try{ - compileNoStats(plan); - }catch(Exception e){ - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - /** - * <pre> - * +---------Iteration-------+ - * | | - * /--map--< >----\ | - * / | \ /-------< >---sink - * src-map | join------/ | - * \ | / | - * \ +-----/-------------------+ - * \ / - * \--reduce--/ - * </pre> - */ - @Test - public void testIterationWithStaticInput() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); - - DataSet<Long> source = env.generateSequence(1, 1000000); - - DataSet<Long> mapped = source.map(new IdentityMapper<Long>()); - - DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>()); - - IterativeDataSet<Long> iteration = mapped.iterate(10); - iteration.closeWith( - iteration.join(reduced) - .where(new IdentityKeyExtractor<Long>()) - .equalTo(new IdentityKeyExtractor<Long>()) - .with(new DummyFlatJoinFunction<Long>())) - .output(new DiscardingOutputFormat<Long>()); - - compileNoStats(env.createProgramPlan()); - } - catch(Exception e){ - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testBranchingBroadcastVariable() { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); - - DataSet<String> input1 = env.readTextFile(IN_FILE).name("source1"); - DataSet<String> input2 = env.readTextFile(IN_FILE).name("source2"); - DataSet<String> input3 = env.readTextFile(IN_FILE).name("source3"); - - DataSet<String> result1 = input1 - .map(new IdentityMapper<String>()) - .reduceGroup(new Top1GroupReducer<String>()) - .withBroadcastSet(input3, "bc"); - - DataSet<String> result2 = input2 - .map(new IdentityMapper<String>()) - .reduceGroup(new Top1GroupReducer<String>()) - .withBroadcastSet(input3, "bc"); - - result1.join(result2) - .where(new IdentityKeyExtractor<String>()) - .equalTo(new IdentityKeyExtractor<String>()) - .with(new RichJoinFunction<String, String, String>() { - @Override - public String join(String first, String second) { - return null; - } - }) - .withBroadcastSet(input3, "bc1") - .withBroadcastSet(input1, "bc2") - .withBroadcastSet(result1, "bc3") - .print(); - - Plan plan = env.createProgramPlan(); - - try{ - compileNoStats(plan); - }catch(Exception e){ - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - @Test - public void testBCVariableClosure() { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<String> input = env.readTextFile(IN_FILE).name("source1"); - - DataSet<String> reduced = input - .map(new IdentityMapper<String>()) - .reduceGroup(new Top1GroupReducer<String>()); - - - DataSet<String> initialSolution = input.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc"); - - - IterativeDataSet<String> iteration = initialSolution.iterate(100); - - iteration.closeWith(iteration.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "red")) - .print(); - - Plan plan = env.createProgramPlan(); - - try{ - compileNoStats(plan); - }catch(Exception e){ - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - @Test - public void testMultipleIterations() { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); - - DataSet<String> input = env.readTextFile(IN_FILE).name("source1"); - - DataSet<String> reduced = input - .map(new IdentityMapper<String>()) - .reduceGroup(new Top1GroupReducer<String>()); - - IterativeDataSet<String> iteration1 = input.iterate(100); - IterativeDataSet<String> iteration2 = input.iterate(20); - IterativeDataSet<String> iteration3 = input.iterate(17); - - iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1")).print(); - iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2")).print(); - iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3")).print(); - - Plan plan = env.createProgramPlan(); - - try{ - compileNoStats(plan); - }catch(Exception e){ - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - @Test - public void testMultipleIterationsWithClosueBCVars() { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); - - DataSet<String> input = env.readTextFile(IN_FILE).name("source1"); - - IterativeDataSet<String> iteration1 = input.iterate(100); - IterativeDataSet<String> iteration2 = input.iterate(20); - IterativeDataSet<String> iteration3 = input.iterate(17); - - - iteration1.closeWith(iteration1.map(new IdentityMapper<String>())).print(); - iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>())).print(); - iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>())).print(); - - Plan plan = env.createProgramPlan(); - - try{ - compileNoStats(plan); - }catch(Exception e){ - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - @Test - public void testBranchesOnlyInBCVariables1() { - try{ - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); - - DataSet<Long> input = env.generateSequence(1, 10); - DataSet<Long> bc_input = env.generateSequence(1, 10); - - input - .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1") - .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2") - .print(); - - Plan plan = env.createProgramPlan(); - compileNoStats(plan); - } - catch(Exception e){ - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testBranchesOnlyInBCVariables2() { - try{ - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(100); - - DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input"); - - DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1"); - DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1"); - - DataSet<Tuple2<Long, Long>> joinInput1 = - input.map(new IdentityMapper<Tuple2<Long,Long>>()) - .withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1") - .withBroadcastSet(bc_input2, "bc2"); - - DataSet<Tuple2<Long, Long>> joinInput2 = - input.map(new IdentityMapper<Tuple2<Long,Long>>()) - .withBroadcastSet(bc_input1, "bc1") - .withBroadcastSet(bc_input2, "bc2"); - - DataSet<Tuple2<Long, Long>> joinResult = joinInput1 - .join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1) - .with(new DummyFlatJoinFunction<Tuple2<Long,Long>>()); - - input - .map(new IdentityMapper<Tuple2<Long,Long>>()) - .withBroadcastSet(bc_input1, "bc1") - .union(joinResult) - .print(); - - Plan plan = env.createProgramPlan(); - compileNoStats(plan); - } - catch(Exception e){ - e.printStackTrace(); - fail(e.getMessage()); - } - } - - private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> { - - @Override - public Tuple2<T, T> map(T value) { - return new Tuple2<T, T>(value, value); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java deleted file mode 100644 index c7ad2da..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.Plan; -import org.junit.Test; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.optimizer.dag.TempMode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.testfunctions.IdentityMapper; - -@SuppressWarnings("serial") -public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase { - - @Test - public void testNoBreakerForIndependentVariable() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<String> source1 = env.fromElements("test"); - DataSet<String> source2 = env.fromElements("test"); - - source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name").print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource(); - - assertEquals(TempMode.NONE, mapper.getInput().getTempMode()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testBreakerForDependentVariable() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<String> source1 = env.fromElements("test"); - - source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name").print(); - - Plan p = env.createProgramPlan(); - OptimizedPlan op = compileNoStats(p); - - SinkPlanNode sink = op.getDataSinks().iterator().next(); - SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource(); - - assertEquals(TempMode.PIPELINE_BREAKER, mapper.getInput().getTempMode()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java deleted file mode 100644 index 3e7da6c..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import org.junit.Test; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.operators.GenericDataSourceBase; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.IterativeDataSet; -import org.apache.flink.api.common.functions.RichJoinFunction; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.dag.TempMode; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.operators.DriverStrategy; - -/** -* Tests that validate optimizer choice when using hash joins inside of iterations -*/ -@SuppressWarnings("serial") -public class CachedMatchStrategyCompilerTest extends CompilerTestBase { - - /** - * This tests whether a HYBRIDHASH_BUILD_SECOND is correctly transformed to a HYBRIDHASH_BUILD_SECOND_CACHED - * when inside of an iteration an on the static path - */ - @Test - public void testRightSide() { - try { - - Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND); - - OptimizedPlan oPlan = compileNoStats(plan); - - OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); - DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner"); - - // verify correct join strategy - assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy()); - assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); - assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); - - new JobGraphGenerator().compileJobGraph(oPlan); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test errored: " + e.getMessage()); - } - } - - /** - * This test makes sure that only a HYBRIDHASH on the static path is transformed to the cached variant - */ - @Test - public void testRightSideCountercheck() { - try { - - Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST); - - OptimizedPlan oPlan = compileNoStats(plan); - - OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); - DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner"); - - // verify correct join strategy - assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, innerJoin.getDriverStrategy()); - assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); - assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode()); - - new JobGraphGenerator().compileJobGraph(oPlan); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test errored: " + e.getMessage()); - } - } - - /** - * This tests whether a HYBRIDHASH_BUILD_FIRST is correctly transformed to a HYBRIDHASH_BUILD_FIRST_CACHED - * when inside of an iteration an on the static path - */ - @Test - public void testLeftSide() { - try { - - Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST); - - OptimizedPlan oPlan = compileNoStats(plan); - - OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); - DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner"); - - // verify correct join strategy - assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy()); - assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); - assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); - - new JobGraphGenerator().compileJobGraph(oPlan); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test errored: " + e.getMessage()); - } - } - - /** - * This test makes sure that only a HYBRIDHASH on the static path is transformed to the cached variant - */ - @Test - public void testLeftSideCountercheck() { - try { - - Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND); - - OptimizedPlan oPlan = compileNoStats(plan); - - OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); - DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner"); - - // verify correct join strategy - assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy()); - assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode()); - assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); - - new JobGraphGenerator().compileJobGraph(oPlan); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test errored: " + e.getMessage()); - } - } - - /** - * This test simulates a join of a big left side with a small right side inside of an iteration, where the small side is on a static path. - * Currently the best execution plan is a HYBRIDHASH_BUILD_SECOND_CACHED, where the small side is hashed and cached. - * This test also makes sure that all relevant plans are correctly enumerated by the optimizer. - */ - @Test - public void testCorrectChoosing() { - try { - - Plan plan = getTestPlanRightStatic(""); - - SourceCollectorVisitor sourceCollector = new SourceCollectorVisitor(); - plan.accept(sourceCollector); - - for(GenericDataSourceBase<?, ?> s : sourceCollector.getSources()) { - if(s.getName().equals("bigFile")) { - this.setSourceStatistics(s, 10000000, 1000); - } - else if(s.getName().equals("smallFile")) { - this.setSourceStatistics(s, 100, 100); - } - } - - - OptimizedPlan oPlan = compileNoStats(plan); - - OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); - DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner"); - - // verify correct join strategy - assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy()); - assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); - assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); - - new JobGraphGenerator().compileJobGraph(oPlan); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test errored: " + e.getMessage()); - } - } - - private Plan getTestPlanRightStatic(String strategy) { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); - - DataSet<Tuple3<Long, Long, Long>> bigInput = env.readCsvFile("file://bigFile").types(Long.class, Long.class, Long.class).name("bigFile"); - - DataSet<Tuple3<Long, Long, Long>> smallInput = env.readCsvFile("file://smallFile").types(Long.class, Long.class, Long.class).name("smallFile"); - - IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10); - - Configuration joinStrategy = new Configuration(); - joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH); - - if(strategy != "") { - joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy); - } - - DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy); - - DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner); - - output.print(); - - return env.createProgramPlan(); - - } - - private Plan getTestPlanLeftStatic(String strategy) { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DEFAULT_PARALLELISM); - - @SuppressWarnings("unchecked") - DataSet<Tuple3<Long, Long, Long>> bigInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L), - new Tuple3<Long, Long, Long>(1L, 2L, 3L),new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Big"); - - @SuppressWarnings("unchecked") - DataSet<Tuple3<Long, Long, Long>> smallInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Small"); - - IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10); - - Configuration joinStrategy = new Configuration(); - joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy); - - DataSet<Tuple3<Long, Long, Long>> inner = smallInput.join(iteration).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy); - - DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner); - - output.print(); - - return env.createProgramPlan(); - - } - - private static class DummyJoiner extends RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> { - - @Override - public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, - Tuple3<Long, Long, Long> second) throws Exception { - - return first; - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java deleted file mode 100644 index eba07f1..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer; - -import org.apache.flink.api.common.functions.RichCoGroupFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.junit.Assert; -import org.junit.Test; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.DeltaIteration; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.util.Collector; -import org.apache.flink.util.Visitor; - -@SuppressWarnings("serial") -public class CoGroupSolutionSetFirstTest extends CompilerTestBase { - - public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> { - @Override - public void coGroup(Iterable<Tuple1<Integer>> first, Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {} - } - - public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> { - @Override - public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception { - return null; - } - } - - @Test - public void testCoGroupSolutionSet() { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple1<Integer>> raw = env.readCsvFile(IN_FILE).types(Integer.class); - - DeltaIteration<Tuple1<Integer>, Tuple1<Integer>> iteration = raw.iterateDelta(raw, 1000, 0); - - DataSet<Tuple1<Integer>> test = iteration.getWorkset().map(new SimpleMap()); - DataSet<Tuple1<Integer>> delta = iteration.getSolutionSet().coGroup(test).where(0).equalTo(0).with(new SimpleCGroup()); - DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap()); - DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback); - - result.print(); - - Plan plan = env.createProgramPlan(); - OptimizedPlan oPlan = null; - try { - oPlan = compileNoStats(plan); - } catch(CompilerException e) { - Assert.fail(e.getMessage()); - } - - oPlan.accept(new Visitor<PlanNode>() { - @Override - public boolean preVisit(PlanNode visitable) { - if (visitable instanceof WorksetIterationPlanNode) { - PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode(); - - //get the CoGroup - DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().iterator().next().getSource(); - Channel in1 = dpn.getInput1(); - Channel in2 = dpn.getInput2(); - - Assert.assertTrue(in1.getLocalProperties().getOrdering() == null); - Assert.assertTrue(in2.getLocalProperties().getOrdering() != null); - Assert.assertTrue(in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(0)); - Assert.assertTrue(in1.getShipStrategy() == ShipStrategyType.FORWARD); - Assert.assertTrue(in2.getShipStrategy() == ShipStrategyType.PARTITION_HASH); - return false; - } - return true; - } - - @Override - public void postVisit(PlanNode visitable) {} - }); - } -}