http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java deleted file mode 100644 index 7ae35c3..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class NoOpDescriptor extends OperatorDescriptorSingle { - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.UNARY_NO_OP; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "Pipe", in, DriverStrategy.UNARY_NO_OP); - } - - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - return Collections.singletonList(new RequestedGlobalProperties()); - } - - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - return gProps; - } - - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java deleted file mode 100644 index c21593e..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.List; - -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.TwoInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; - -/** - * - */ -public abstract class OperatorDescriptorDual implements AbstractOperatorDescriptor { - - protected final FieldList keys1; - protected final FieldList keys2; - - private List<GlobalPropertiesPair> globalProps; - private List<LocalPropertiesPair> localProps; - - protected OperatorDescriptorDual() { - this(null, null); - } - - protected OperatorDescriptorDual(FieldList keys1, FieldList keys2) { - this.keys1 = keys1; - this.keys2 = keys2; - } - - public List<GlobalPropertiesPair> getPossibleGlobalProperties() { - if (this.globalProps == null) { - this.globalProps = createPossibleGlobalProperties(); - } - - return this.globalProps; - } - - public List<LocalPropertiesPair> getPossibleLocalProperties() { - if (this.localProps == null) { - this.localProps = createPossibleLocalProperties(); - } - - return this.localProps; - } - - protected abstract List<GlobalPropertiesPair> createPossibleGlobalProperties(); - - protected abstract List<LocalPropertiesPair> createPossibleLocalProperties(); - - public abstract boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, - GlobalProperties produced1, GlobalProperties produced2); - - public abstract boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2); - - public abstract DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node); - - public abstract GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2); - - public abstract LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2); - - protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2) { - - // check number of produced partitioning fields - if(fields1.size() != fields2.size()) { - return false; - } else { - return checkEquivalentFieldPositionsInKeyFields(fields1, fields2, fields1.size()); - } - } - - protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2, int numRelevantFields) { - - // check number of produced partitioning fields - if(fields1.size() < numRelevantFields || fields2.size() < numRelevantFields) { - return false; - } - else { - for(int i=0; i<numRelevantFields; i++) { - int pField1 = fields1.get(i); - int pField2 = fields2.get(i); - // check if position of both produced fields is the same in both requested fields - int j; - for(j=0; j<this.keys1.size(); j++) { - if(this.keys1.get(j) == pField1 && this.keys2.get(j) == pField2) { - break; - } - else if(this.keys1.get(j) != pField1 && this.keys2.get(j) != pField2) { - // do nothing - } - else { - return false; - } - } - if(j == this.keys1.size()) { - throw new CompilerException("Fields were not found in key fields."); - } - } - } - return true; - } - - // -------------------------------------------------------------------------------------------- - - public static final class GlobalPropertiesPair { - - private final RequestedGlobalProperties props1, props2; - - public GlobalPropertiesPair(RequestedGlobalProperties props1, RequestedGlobalProperties props2) { - this.props1 = props1; - this.props2 = props2; - } - - public RequestedGlobalProperties getProperties1() { - return this.props1; - } - - public RequestedGlobalProperties getProperties2() { - return this.props2; - } - - @Override - public int hashCode() { - return (this.props1 == null ? 0 : this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode()); - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == GlobalPropertiesPair.class) { - final GlobalPropertiesPair other = (GlobalPropertiesPair) obj; - - return (this.props1 == null ? other.props1 == null : this.props1.equals(other.props1)) && - (this.props2 == null ? other.props2 == null : this.props2.equals(other.props2)); - } - return false; - } - - @Override - public String toString() { - return "{" + this.props1 + " / " + this.props2 + "}"; - } - } - - public static final class LocalPropertiesPair { - - private final RequestedLocalProperties props1, props2; - - public LocalPropertiesPair(RequestedLocalProperties props1, RequestedLocalProperties props2) { - this.props1 = props1; - this.props2 = props2; - } - - public RequestedLocalProperties getProperties1() { - return this.props1; - } - - public RequestedLocalProperties getProperties2() { - return this.props2; - } - - @Override - public int hashCode() { - return (this.props1 == null ? 0 : this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode()); - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == LocalPropertiesPair.class) { - final LocalPropertiesPair other = (LocalPropertiesPair) obj; - - return (this.props1 == null ? other.props1 == null : this.props1.equals(other.props1)) && - (this.props2 == null ? other.props2 == null : this.props2.equals(other.props2)); - } - return false; - } - - @Override - public String toString() { - return "{" + this.props1 + " / " + this.props2 + "}"; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java deleted file mode 100644 index c8be5d4..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.List; - -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; - -/** - * Abstract base class for Operator descriptions which instantiates the node and sets the driver - * strategy and the sorting and grouping keys. Returns possible local and global properties and - * updates them after the operation has been performed. - * @see org.apache.flink.compiler.dag.SingleInputNode - */ -public abstract class OperatorDescriptorSingle implements AbstractOperatorDescriptor { - - protected final FieldSet keys; // the set of key fields - protected final FieldList keyList; // the key fields with ordered field positions - - private List<RequestedGlobalProperties> globalProps; - private List<RequestedLocalProperties> localProps; - - - protected OperatorDescriptorSingle() { - this(null); - } - - protected OperatorDescriptorSingle(FieldSet keys) { - this.keys = keys; - this.keyList = keys == null ? null : keys.toFieldList(); - } - - - public List<RequestedGlobalProperties> getPossibleGlobalProperties() { - if (this.globalProps == null) { - this.globalProps = createPossibleGlobalProperties(); - } - return this.globalProps; - } - - public List<RequestedLocalProperties> getPossibleLocalProperties() { - if (this.localProps == null) { - this.localProps = createPossibleLocalProperties(); - } - return this.localProps; - } - - /** - * Returns a list of global properties that are required by this operator descriptor. - * - * @return A list of global properties that are required by this operator descriptor. - */ - protected abstract List<RequestedGlobalProperties> createPossibleGlobalProperties(); - - /** - * Returns a list of local properties that are required by this operator descriptor. - * - * @return A list of local properties that are required by this operator descriptor. - */ - protected abstract List<RequestedLocalProperties> createPossibleLocalProperties(); - - public abstract SingleInputPlanNode instantiate(Channel in, SingleInputNode node); - - /** - * Returns the global properties which are present after the operator was applied on the - * provided global properties. - * - * @param in The global properties on which the operator is applied. - * @return The global properties which are valid after the operator has been applied. - */ - public abstract GlobalProperties computeGlobalProperties(GlobalProperties in); - - /** - * Returns the local properties which are present after the operator was applied on the - * provided local properties. - * - * @param in The local properties on which the operator is applied. - * @return The local properties which are valid after the operator has been applied. - */ - public abstract LocalProperties computeLocalProperties(LocalProperties in); -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java deleted file mode 100644 index 2bde29b..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.optimizer.dag.GroupReduceNode; -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - -public final class PartialGroupProperties extends OperatorDescriptorSingle { - - public PartialGroupProperties(FieldSet keys) { - super(keys); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.SORTED_GROUP_COMBINE; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - // create in input node for combine with same DOP as input node - GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator()); - combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); - - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in, - DriverStrategy.SORTED_GROUP_COMBINE); - // sorting key info - combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); - // set grouping comparator key info - combiner.setDriverKeyInfo(this.keyList, 1); - - return combiner; - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - return Collections.singletonList(new RequestedGlobalProperties()); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - RequestedLocalProperties props = new RequestedLocalProperties(); - props.setGroupedFields(this.keys); - return Collections.singletonList(props); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) - { - gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gProps.clearUniqueFieldCombinations(); - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps.clearUniqueFieldSets(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java deleted file mode 100644 index 5bb51f3..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.ReduceNode; -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.io.network.DataExchangeMode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; - -public final class ReduceProperties extends OperatorDescriptorSingle { - - private final Partitioner<?> customPartitioner; - - public ReduceProperties(FieldSet keys) { - this(keys, null); - } - - public ReduceProperties(FieldSet keys, Partitioner<?> customPartitioner) { - super(keys); - this.customPartitioner = customPartitioner; - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.SORTED_REDUCE; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - if (in.getShipStrategy() == ShipStrategyType.FORWARD || - (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) - { - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, - DriverStrategy.SORTED_REDUCE, this.keyList); - } - else { - // non forward case. all local properties are killed anyways, so we can safely plug in a combiner - Channel toCombiner = new Channel(in.getSource()); - toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - - // create an input node for combine with same DOP as input node - ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); - combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); - - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, - "Combine ("+node.getOperator().getName()+")", toCombiner, - DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList); - - combiner.setCosts(new Costs(0, 0)); - combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); - - Channel toReducer = new Channel(combiner); - toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), - in.getShipStrategySortOrder(), in.getDataExchangeMode()); - toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - - return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer, - DriverStrategy.SORTED_REDUCE, this.keyList); - } - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties props = new RequestedGlobalProperties(); - if (customPartitioner == null) { - props.setAnyPartitioning(this.keys); - } else { - props.setCustomPartitioned(this.keys, this.customPartitioner); - } - return Collections.singletonList(props); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - RequestedLocalProperties props = new RequestedLocalProperties(); - props.setGroupedFields(this.keys); - return Collections.singletonList(props); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) - { - gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gProps.clearUniqueFieldCombinations(); - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps.clearUniqueFieldSets(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java deleted file mode 100644 index 1dcd87d..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - -/** - * - */ -public class SolutionSetDeltaOperator extends OperatorDescriptorSingle { - - public SolutionSetDeltaOperator(FieldList partitioningFields) { - super(partitioningFields); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.UNARY_NO_OP; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "SolutionSet Delta", in, DriverStrategy.UNARY_NO_OP); - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties partProps = new RequestedGlobalProperties(); - partProps.setHashPartitioned(this.keyList); - return Collections.singletonList(partProps); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java deleted file mode 100644 index 356836a..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.TwoInputNode; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.util.Utils; -import org.apache.flink.runtime.operators.DriverStrategy; - -/** - * - */ -public class SortMergeJoinDescriptor extends AbstractJoinDescriptor { - - public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2) { - super(keys1, keys2); - } - - public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2, - boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) - { - super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.MERGE; - } - - @Override - protected List<LocalPropertiesPair> createPossibleLocalProperties() { - RequestedLocalProperties sort1 = new RequestedLocalProperties(Utils.createOrdering(this.keys1)); - RequestedLocalProperties sort2 = new RequestedLocalProperties(Utils.createOrdering(this.keys2)); - return Collections.singletonList(new LocalPropertiesPair(sort1, sort2)); - } - - @Override - public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2) - { - int numRelevantFields = this.keys1.size(); - - Ordering prod1 = produced1.getOrdering(); - Ordering prod2 = produced2.getOrdering(); - - if (prod1 == null || prod2 == null) { - throw new CompilerException("The given properties do not meet this operators requirements."); - } - - // check that order of fields is equivalent - if (!checkEquivalentFieldPositionsInKeyFields( - prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) { - return false; - } - - // check that both inputs have the same directions of order - for (int i = 0; i < numRelevantFields; i++) { - if (prod1.getOrder(i) != prod2.getOrder(i)) { - return false; - } - } - return true; - } - - @Override - public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections(); - - if (inputOrders == null || inputOrders.length < this.keys1.size()) { - throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a merge operator."); - } else if (inputOrders.length > this.keys1.size()) { - boolean[] tmp = new boolean[this.keys1.size()]; - System.arraycopy(inputOrders, 0, tmp, 0, tmp.length); - inputOrders = tmp; - } - - return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders); - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - LocalProperties comb = LocalProperties.combine(in1, in2); - return comb.clearUniqueFieldSets(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java deleted file mode 100644 index c42cff2..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.SinkJoiner; -import org.apache.flink.optimizer.dag.TwoInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.SinkJoinerPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - -/** - * - */ -public class UtilSinkJoinOpDescriptor extends OperatorDescriptorDual { - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.BINARY_NO_OP; - } - - @Override - protected List<GlobalPropertiesPair> createPossibleGlobalProperties() { - // all properties are possible - return Collections.singletonList(new GlobalPropertiesPair( - new RequestedGlobalProperties(), new RequestedGlobalProperties())); - } - - @Override - protected List<LocalPropertiesPair> createPossibleLocalProperties() { - // all properties are possible - return Collections.singletonList(new LocalPropertiesPair( - new RequestedLocalProperties(), new RequestedLocalProperties())); - } - - @Override - public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, - GlobalProperties produced1, GlobalProperties produced2) { - return true; - } - - @Override - public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2) { - return true; - } - - @Override - public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - if (node instanceof SinkJoiner) { - return new SinkJoinerPlanNode((SinkJoiner) node, in1, in2); - } else { - throw new CompilerException(); - } - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - return new LocalProperties(); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { - return GlobalProperties.combine(in1, in2); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java deleted file mode 100644 index bf22fb3..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plan; - -import org.apache.flink.optimizer.dag.BinaryUnionNode; -import org.apache.flink.runtime.operators.DriverStrategy; - -/** - * A special subclass for the union to make it identifiable. - */ -public class BinaryUnionPlanNode extends DualInputPlanNode { - - /** - * @param template - */ - public BinaryUnionPlanNode(BinaryUnionNode template, Channel in1, Channel in2) { - super(template, "Union", in1, in2, DriverStrategy.UNION); - } - - public BinaryUnionPlanNode(BinaryUnionPlanNode toSwapFrom) { - super(toSwapFrom.getOptimizerNode(), "Union-With-Cached", toSwapFrom.getInput2(), toSwapFrom.getInput1(), - DriverStrategy.UNION_WITH_CACHED); - - this.globalProps = toSwapFrom.globalProps; - this.localProps = toSwapFrom.localProps; - this.nodeCosts = toSwapFrom.nodeCosts; - this.cumulativeCosts = toSwapFrom.cumulativeCosts; - - setParallelism(toSwapFrom.getParallelism()); - } - - public BinaryUnionNode getOptimizerNode() { - return (BinaryUnionNode) this.template; - } - - public boolean unionsStaticAndDynamicPath() { - return getInput1().isOnDynamicPath() != getInput2().isOnDynamicPath(); - } - - @Override - public int getMemoryConsumerWeight() { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java deleted file mode 100644 index e79e2f3..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.plan; - -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; - -import java.util.HashMap; - -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.BulkIterationNode; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.util.Visitor; - -public class BulkIterationPlanNode extends SingleInputPlanNode implements IterationPlanNode { - - private final BulkPartialSolutionPlanNode partialSolutionPlanNode; - - private final PlanNode rootOfStepFunction; - - private PlanNode rootOfTerminationCriterion; - - private TypeSerializerFactory<?> serializerForIterationChannel; - - // -------------------------------------------------------------------------------------------- - - public BulkIterationPlanNode(BulkIterationNode template, String nodeName, Channel input, - BulkPartialSolutionPlanNode pspn, PlanNode rootOfStepFunction) - { - super(template, nodeName, input, DriverStrategy.NONE); - this.partialSolutionPlanNode = pspn; - this.rootOfStepFunction = rootOfStepFunction; - - mergeBranchPlanMaps(); - } - - public BulkIterationPlanNode(BulkIterationNode template, String nodeName, Channel input, - BulkPartialSolutionPlanNode pspn, PlanNode rootOfStepFunction, PlanNode rootOfTerminationCriterion) - { - this(template, nodeName, input, pspn, rootOfStepFunction); - this.rootOfTerminationCriterion = rootOfTerminationCriterion; - } - - // -------------------------------------------------------------------------------------------- - - public BulkIterationNode getIterationNode() { - if (this.template instanceof BulkIterationNode) { - return (BulkIterationNode) this.template; - } else { - throw new RuntimeException(); - } - } - - public BulkPartialSolutionPlanNode getPartialSolutionPlanNode() { - return this.partialSolutionPlanNode; - } - - public PlanNode getRootOfStepFunction() { - return this.rootOfStepFunction; - } - - public PlanNode getRootOfTerminationCriterion() { - return this.rootOfTerminationCriterion; - } - - // -------------------------------------------------------------------------------------------- - - - public TypeSerializerFactory<?> getSerializerForIterationChannel() { - return serializerForIterationChannel; - } - - public void setSerializerForIterationChannel(TypeSerializerFactory<?> serializerForIterationChannel) { - this.serializerForIterationChannel = serializerForIterationChannel; - } - - public void setCosts(Costs nodeCosts) { - // add the costs from the step function - nodeCosts.addCosts(this.rootOfStepFunction.getCumulativeCosts()); - - // add the costs for the termination criterion, if it exists - // the costs are divided at branches, so we can simply add them up - if (rootOfTerminationCriterion != null) { - nodeCosts.addCosts(this.rootOfTerminationCriterion.getCumulativeCosts()); - } - - super.setCosts(nodeCosts); - } - - public int getMemoryConsumerWeight() { - return 1; - } - - - @Override - public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { - if (source == this) { - return FOUND_SOURCE; - } - - SourceAndDamReport fromOutside = super.hasDamOnPathDownTo(source); - - if (fromOutside == FOUND_SOURCE_AND_DAM) { - return FOUND_SOURCE_AND_DAM; - } - else if (fromOutside == FOUND_SOURCE) { - // we always have a dam in the back channel - return FOUND_SOURCE_AND_DAM; - } else { - // check the step function for dams - return this.rootOfStepFunction.hasDamOnPathDownTo(source); - } - } - - @Override - public void acceptForStepFunction(Visitor<PlanNode> visitor) { - this.rootOfStepFunction.accept(visitor); - - if(this.rootOfTerminationCriterion != null) { - this.rootOfTerminationCriterion.accept(visitor); - } - } - - private void mergeBranchPlanMaps() { - for (OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()) { - OptimizerNode brancher = desc.getBranchingNode(); - - if (branchPlan == null) { - branchPlan = new HashMap<OptimizerNode, PlanNode>(6); - } - - if (!branchPlan.containsKey(brancher)) { - PlanNode selectedCandidate = null; - - if (rootOfStepFunction.branchPlan != null) { - selectedCandidate = rootOfStepFunction.branchPlan.get(brancher); - } - - if (selectedCandidate == null) { - throw new CompilerException( - "Candidates for a node with open branches are missing information about the selected candidate "); - } - - this.branchPlan.put(brancher, selectedCandidate); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java deleted file mode 100644 index df05b64..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.plan; - -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; - -import java.util.Collections; -import java.util.HashMap; - -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.BulkPartialSolutionNode; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.runtime.operators.DamBehavior; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.util.Visitor; - -/** - * Plan candidate node for partial solution of a bulk iteration. - */ -public class BulkPartialSolutionPlanNode extends PlanNode { - - private static final Costs NO_COSTS = new Costs(); - - private BulkIterationPlanNode containingIterationNode; - - private Channel initialInput; - - public Object postPassHelper; - - - public BulkPartialSolutionPlanNode(BulkPartialSolutionNode template, String nodeName, - GlobalProperties gProps, LocalProperties lProps, - Channel initialInput) - { - super(template, nodeName, DriverStrategy.NONE); - - this.globalProps = gProps; - this.localProps = lProps; - this.initialInput = initialInput; - - // the partial solution does not cost anything - this.nodeCosts = NO_COSTS; - this.cumulativeCosts = NO_COSTS; - - if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) { - if (this.branchPlan == null) { - this.branchPlan = new HashMap<OptimizerNode, PlanNode>(); - } - - this.branchPlan.putAll(initialInput.getSource().branchPlan); - } - } - - // -------------------------------------------------------------------------------------------- - - public BulkPartialSolutionNode getPartialSolutionNode() { - return (BulkPartialSolutionNode) this.template; - } - - public BulkIterationPlanNode getContainingIterationNode() { - return this.containingIterationNode; - } - - public void setContainingIterationNode(BulkIterationPlanNode containingIterationNode) { - this.containingIterationNode = containingIterationNode; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public void accept(Visitor<PlanNode> visitor) { - if (visitor.preVisit(this)) { - visitor.postVisit(this); - } - } - - @Override - public Iterable<PlanNode> getPredecessors() { - return Collections.<PlanNode>emptyList(); - } - - @Override - public Iterable<Channel> getInputs() { - return Collections.<Channel>emptyList(); - } - - @Override - public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { - if (source == this) { - return FOUND_SOURCE; - } - SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source); - if (res == FOUND_SOURCE_AND_DAM) { - return FOUND_SOURCE_AND_DAM; - } - else if (res == FOUND_SOURCE) { - return (this.initialInput.getLocalStrategy().dams() || - this.initialInput.getTempMode().breaksPipeline() || - getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ? - FOUND_SOURCE_AND_DAM : FOUND_SOURCE; - } - else { - return NOT_FOUND; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java deleted file mode 100644 index 875d1c3..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java +++ /dev/null @@ -1,538 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plan; - -import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.EstimateProvider; -import org.apache.flink.optimizer.dag.TempMode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plandump.DumpableConnection; -import org.apache.flink.optimizer.util.Utils; -import org.apache.flink.runtime.io.network.DataExchangeMode; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; - -/** - * A Channel represents the result produced by an operator and the data exchange - * before the consumption by the target operator. - * - * The channel defines and tracks various properties and characteristics of the - * data set and data exchange. - * - * Data set characteristics: - * <ul> - * <li>The "global properties" of the data, i.e., how the data is distributed across - * partitions</li> - * <li>The "required global properties" of the data, i.e., the global properties that, if absent, - * would cause the program to return a wrong result.</li> - * <li>The "local properties" of the data, i.e., how the data is organized within a partition</li> - * <li>The "required local properties" of the data, i.e., the local properties that, if absent, - * would cause the program to return a wrong result.</li> - * </ul> - * - * Data exchange parameters: - * <ul> - * <li>The "ship strategy", i.e., whether to forward the data, shuffle it, broadcast it, ...</li> - * <li>The "ship keys", which are the positions of the key fields in the exchanged records.</li> - * <li>The "data exchange mode", which defines whether to pipeline or batch the exchange</li> - * <li>Several more...</li> - * </ul> - */ -public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> { - - private PlanNode source; - - private PlanNode target; - - private ShipStrategyType shipStrategy = ShipStrategyType.NONE; - - private DataExchangeMode dataExchangeMode; - - private LocalStrategy localStrategy = LocalStrategy.NONE; - - private FieldList shipKeys; - - private FieldList localKeys; - - private boolean[] shipSortOrder; - - private boolean[] localSortOrder; - - private RequestedGlobalProperties requiredGlobalProps; - - private RequestedLocalProperties requiredLocalProps; - - private GlobalProperties globalProps; - - private LocalProperties localProps; - - private TypeSerializerFactory<?> serializer; - - private TypeComparatorFactory<?> shipStrategyComparator; - - private TypeComparatorFactory<?> localStrategyComparator; - - private DataDistribution dataDistribution; - - private Partitioner<?> partitioner; - - private TempMode tempMode; - - private double relativeTempMemory; - - private double relativeMemoryLocalStrategy; - - private int replicationFactor = 1; - - // -------------------------------------------------------------------------------------------- - - public Channel(PlanNode sourceNode) { - this(sourceNode, null); - } - - public Channel(PlanNode sourceNode, TempMode tempMode) { - this.source = sourceNode; - this.tempMode = (tempMode == null ? TempMode.NONE : tempMode); - } - - // -------------------------------------------------------------------------------------------- - // Accessors - // -------------------------------------------------------------------------------------------- - - /** - * Gets the source of this Channel. - * - * @return The source. - */ - @Override - public PlanNode getSource() { - return this.source; - } - - /** - * Sets the target of this Channel. - * - * @param target The target. - */ - public void setTarget(PlanNode target) { - this.target = target; - } - - /** - * Gets the target of this Channel. - * - * @return The target. - */ - public PlanNode getTarget() { - return this.target; - } - - public void setShipStrategy(ShipStrategyType strategy, DataExchangeMode dataExchangeMode) { - setShipStrategy(strategy, null, null, null, dataExchangeMode); - } - - public void setShipStrategy(ShipStrategyType strategy, FieldList keys, DataExchangeMode dataExchangeMode) { - setShipStrategy(strategy, keys, null, null, dataExchangeMode); - } - - public void setShipStrategy(ShipStrategyType strategy, FieldList keys, - boolean[] sortDirection, DataExchangeMode dataExchangeMode) { - setShipStrategy(strategy, keys, sortDirection, null, dataExchangeMode); - } - - public void setShipStrategy(ShipStrategyType strategy, FieldList keys, - Partitioner<?> partitioner, DataExchangeMode dataExchangeMode) { - setShipStrategy(strategy, keys, null, partitioner, dataExchangeMode); - } - - public void setShipStrategy(ShipStrategyType strategy, FieldList keys, - boolean[] sortDirection, Partitioner<?> partitioner, - DataExchangeMode dataExchangeMode) { - this.shipStrategy = strategy; - this.shipKeys = keys; - this.shipSortOrder = sortDirection; - this.partitioner = partitioner; - this.dataExchangeMode = dataExchangeMode; - this.globalProps = null; // reset the global properties - } - - /** - * Gets the data exchange mode (batch / streaming) to use for the data - * exchange of this channel. - * - * @return The data exchange mode of this channel. - */ - public DataExchangeMode getDataExchangeMode() { - return dataExchangeMode; - } - - public ShipStrategyType getShipStrategy() { - return this.shipStrategy; - } - - public FieldList getShipStrategyKeys() { - return this.shipKeys; - } - - public boolean[] getShipStrategySortOrder() { - return this.shipSortOrder; - } - - public void setLocalStrategy(LocalStrategy strategy) { - setLocalStrategy(strategy, null, null); - } - - public void setLocalStrategy(LocalStrategy strategy, FieldList keys, boolean[] sortDirection) { - this.localStrategy = strategy; - this.localKeys = keys; - this.localSortOrder = sortDirection; - this.localProps = null; // reset the local properties - } - - public LocalStrategy getLocalStrategy() { - return this.localStrategy; - } - - public FieldList getLocalStrategyKeys() { - return this.localKeys; - } - - public boolean[] getLocalStrategySortOrder() { - return this.localSortOrder; - } - - public void setDataDistribution(DataDistribution dataDistribution) { - this.dataDistribution = dataDistribution; - } - - public DataDistribution getDataDistribution() { - return this.dataDistribution; - } - - public Partitioner<?> getPartitioner() { - return partitioner; - } - - public TempMode getTempMode() { - return this.tempMode; - } - - /** - * Sets the temp mode of the connection. - * - * @param tempMode - * The temp mode of the connection. - */ - public void setTempMode(TempMode tempMode) { - this.tempMode = tempMode; - } - - /** - * Gets the memory for materializing the channel's result from this Channel. - * - * @return The temp memory. - */ - public double getRelativeTempMemory() { - return this.relativeTempMemory; - } - - /** - * Sets the memory for materializing the channel's result from this Channel. - * - * @param relativeTempMemory The memory for materialization. - */ - public void setRelativeTempMemory(double relativeTempMemory) { - this.relativeTempMemory = relativeTempMemory; - } - - /** - * Sets the replication factor of the connection. - * - * @param factor The replication factor of the connection. - */ - public void setReplicationFactor(int factor) { - this.replicationFactor = factor; - } - - /** - * Returns the replication factor of the connection. - * - * @return The replication factor of the connection. - */ - public int getReplicationFactor() { - return this.replicationFactor; - } - - /** - * Gets the serializer from this Channel. - * - * @return The serializer. - */ - public TypeSerializerFactory<?> getSerializer() { - return serializer; - } - - /** - * Sets the serializer for this Channel. - * - * @param serializer The serializer to set. - */ - public void setSerializer(TypeSerializerFactory<?> serializer) { - this.serializer = serializer; - } - - /** - * Gets the ship strategy comparator from this Channel. - * - * @return The ship strategy comparator. - */ - public TypeComparatorFactory<?> getShipStrategyComparator() { - return shipStrategyComparator; - } - - /** - * Sets the ship strategy comparator for this Channel. - * - * @param shipStrategyComparator The ship strategy comparator to set. - */ - public void setShipStrategyComparator(TypeComparatorFactory<?> shipStrategyComparator) { - this.shipStrategyComparator = shipStrategyComparator; - } - - /** - * Gets the local strategy comparator from this Channel. - * - * @return The local strategy comparator. - */ - public TypeComparatorFactory<?> getLocalStrategyComparator() { - return localStrategyComparator; - } - - /** - * Sets the local strategy comparator for this Channel. - * - * @param localStrategyComparator The local strategy comparator to set. - */ - public void setLocalStrategyComparator(TypeComparatorFactory<?> localStrategyComparator) { - this.localStrategyComparator = localStrategyComparator; - } - - public double getRelativeMemoryLocalStrategy() { - return relativeMemoryLocalStrategy; - } - - public void setRelativeMemoryLocalStrategy(double relativeMemoryLocalStrategy) { - this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy; - } - - public boolean isOnDynamicPath() { - return this.source.isOnDynamicPath(); - } - - public int getCostWeight() { - return this.source.getCostWeight(); - } - - // -------------------------------------------------------------------------------------------- - // Statistic Estimates - // -------------------------------------------------------------------------------------------- - - - @Override - public long getEstimatedOutputSize() { - long estimate = this.source.template.getEstimatedOutputSize(); - return estimate < 0 ? estimate : estimate * this.replicationFactor; - } - - @Override - public long getEstimatedNumRecords() { - long estimate = this.source.template.getEstimatedNumRecords(); - return estimate < 0 ? estimate : estimate * this.replicationFactor; - } - - @Override - public float getEstimatedAvgWidthPerOutputRecord() { - return this.source.template.getEstimatedAvgWidthPerOutputRecord(); - } - - // -------------------------------------------------------------------------------------------- - // Data Property Handling - // -------------------------------------------------------------------------------------------- - - - public RequestedGlobalProperties getRequiredGlobalProps() { - return requiredGlobalProps; - } - - public void setRequiredGlobalProps(RequestedGlobalProperties requiredGlobalProps) { - this.requiredGlobalProps = requiredGlobalProps; - } - - public RequestedLocalProperties getRequiredLocalProps() { - return requiredLocalProps; - } - - public void setRequiredLocalProps(RequestedLocalProperties requiredLocalProps) { - this.requiredLocalProps = requiredLocalProps; - } - - public GlobalProperties getGlobalProperties() { - if (this.globalProps == null) { - this.globalProps = this.source.getGlobalProperties().clone(); - switch (this.shipStrategy) { - case BROADCAST: - this.globalProps.clearUniqueFieldCombinations(); - this.globalProps.setFullyReplicated(); - break; - case PARTITION_HASH: - this.globalProps.setHashPartitioned(this.shipKeys); - break; - case PARTITION_RANGE: - this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, this.shipSortOrder)); - break; - case FORWARD: - break; - case PARTITION_RANDOM: - this.globalProps.reset(); - break; - case PARTITION_FORCED_REBALANCE: - this.globalProps.setForcedRebalanced(); - break; - case PARTITION_CUSTOM: - this.globalProps.setCustomPartitioned(this.shipKeys, this.partitioner); - break; - case NONE: - throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set."); - } - } - - return this.globalProps; - } - - public LocalProperties getLocalProperties() { - if (this.localProps == null) { - computeLocalPropertiesAfterShippingOnly(); - switch (this.localStrategy) { - case NONE: - break; - case SORT: - case COMBININGSORT: - this.localProps = LocalProperties.forOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder)); - break; - default: - throw new CompilerException("Unsupported local strategy for channel."); - } - } - - return this.localProps; - } - - private void computeLocalPropertiesAfterShippingOnly() { - switch (this.shipStrategy) { - case BROADCAST: - case PARTITION_HASH: - case PARTITION_CUSTOM: - case PARTITION_RANGE: - case PARTITION_RANDOM: - case PARTITION_FORCED_REBALANCE: - this.localProps = new LocalProperties(); - break; - case FORWARD: - this.localProps = this.source.getLocalProperties(); - break; - case NONE: - throw new CompilerException("ShipStrategy has not yet been set."); - default: - throw new CompilerException("Unknown ShipStrategy."); - } - } - - public void adjustGlobalPropertiesForFullParallelismChange() { - if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) { - throw new IllegalStateException("Cannot adjust channel for degree of parallelism " + - "change before the ship strategy is set."); - } - - // make sure the properties are acquired - if (this.globalProps == null) { - getGlobalProperties(); - } - - // some strategies globally reestablish properties - switch (this.shipStrategy) { - case FORWARD: - throw new CompilerException("Cannot use FORWARD strategy between operations " + - "with different number of parallel instances."); - case NONE: // excluded by sanity check. left here for verification check completion - case BROADCAST: - case PARTITION_HASH: - case PARTITION_RANGE: - case PARTITION_RANDOM: - case PARTITION_FORCED_REBALANCE: - case PARTITION_CUSTOM: - return; - } - throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy); - } - - // -------------------------------------------------------------------------------------------- - - /** - * Utility method used while swapping binary union nodes for n-ary union nodes. - */ - public void swapUnionNodes(PlanNode newUnionNode) { - if (!(this.source instanceof BinaryUnionPlanNode)) { - throw new IllegalStateException(); - } else { - this.source = newUnionNode; - } - } - - // -------------------------------------------------------------------------------------------- - - public int getMaxDepth() { - return this.source.getOptimizerNode().getMaxDepth() + 1; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public String toString() { - return "Channel (" + this.source + (this.target == null ? ')' : ") -> (" + this.target + ')') + - '[' + this.shipStrategy + "] [" + this.localStrategy + "] " + - (this.tempMode == null || this.tempMode == TempMode.NONE ? "{NO-TEMP}" : this.tempMode); - } - - @Override - public Channel clone() { - try { - return (Channel) super.clone(); - } catch (CloneNotSupportedException cnsex) { - throw new RuntimeException(cnsex); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java deleted file mode 100644 index 01c56dd..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.plan; - -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; -import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; -import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.dag.TwoInputNode; -import org.apache.flink.runtime.operators.DamBehavior; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.util.Visitor; - -/** - * - */ -public class DualInputPlanNode extends PlanNode { - - protected final Channel input1; - protected final Channel input2; - - protected final FieldList keys1; - protected final FieldList keys2; - - protected final boolean[] sortOrders; - - private TypeComparatorFactory<?> comparator1; - private TypeComparatorFactory<?> comparator2; - private TypePairComparatorFactory<?, ?> pairComparator; - - public Object postPassHelper1; - public Object postPassHelper2; - - // -------------------------------------------------------------------------------------------- - - public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, DriverStrategy diverStrategy) { - this(template, nodeName, input1, input2, diverStrategy, null, null, null); - } - - public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, - DriverStrategy diverStrategy, FieldList driverKeyFields1, FieldList driverKeyFields2) - { - this(template, nodeName, input1, input2, diverStrategy, driverKeyFields1, driverKeyFields2, - SingleInputPlanNode.getTrueArray(driverKeyFields1.size())); - } - - public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, DriverStrategy diverStrategy, - FieldList driverKeyFields1, FieldList driverKeyFields2, boolean[] driverSortOrders) - { - super(template, nodeName, diverStrategy); - this.input1 = input1; - this.input2 = input2; - this.keys1 = driverKeyFields1; - this.keys2 = driverKeyFields2; - this.sortOrders = driverSortOrders; - - if (this.input1.getShipStrategy() == ShipStrategyType.BROADCAST) { - this.input1.setReplicationFactor(getParallelism()); - } - if (this.input2.getShipStrategy() == ShipStrategyType.BROADCAST) { - this.input2.setReplicationFactor(getParallelism()); - } - - mergeBranchPlanMaps(input1.getSource(), input2.getSource()); - } - - // -------------------------------------------------------------------------------------------- - - public TwoInputNode getTwoInputNode() { - if (this.template instanceof TwoInputNode) { - return (TwoInputNode) this.template; - } else { - throw new RuntimeException(); - } - } - - public FieldList getKeysForInput1() { - return this.keys1; - } - - public FieldList getKeysForInput2() { - return this.keys2; - } - - public boolean[] getSortOrders() { - return this.sortOrders; - } - - public TypeComparatorFactory<?> getComparator1() { - return this.comparator1; - } - - public TypeComparatorFactory<?> getComparator2() { - return this.comparator2; - } - - public void setComparator1(TypeComparatorFactory<?> comparator) { - this.comparator1 = comparator; - } - - public void setComparator2(TypeComparatorFactory<?> comparator) { - this.comparator2 = comparator; - } - - public TypePairComparatorFactory<?, ?> getPairComparator() { - return this.pairComparator; - } - - public void setPairComparator(TypePairComparatorFactory<?, ?> comparator) { - this.pairComparator = comparator; - } - - /** - * Gets the first input channel to this node. - * - * @return The first input channel to this node. - */ - public Channel getInput1() { - return this.input1; - } - - /** - * Gets the second input channel to this node. - * - * @return The second input channel to this node. - */ - public Channel getInput2() { - return this.input2; - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public void accept(Visitor<PlanNode> visitor) { - if (visitor.preVisit(this)) { - this.input1.getSource().accept(visitor); - this.input2.getSource().accept(visitor); - - for (Channel broadcastInput : getBroadcastInputs()) { - broadcastInput.getSource().accept(visitor); - } - - visitor.postVisit(this); - } - } - - - @Override - public Iterable<PlanNode> getPredecessors() { - if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) { - return Arrays.asList(this.input1.getSource(), this.input2.getSource()); - } else { - List<PlanNode> preds = new ArrayList<PlanNode>(); - - preds.add(input1.getSource()); - preds.add(input2.getSource()); - - for (Channel c : getBroadcastInputs()) { - preds.add(c.getSource()); - } - - return preds; - } - } - - @Override - public Iterable<Channel> getInputs() { - return Arrays.asList(this.input1, this.input2); - } - - - @Override - public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { - if (source == this) { - return FOUND_SOURCE; - } - - // check first input - SourceAndDamReport res1 = this.input1.getSource().hasDamOnPathDownTo(source); - if (res1 == FOUND_SOURCE_AND_DAM) { - return FOUND_SOURCE_AND_DAM; - } - else if (res1 == FOUND_SOURCE) { - if (this.input1.getLocalStrategy().dams() || this.input1.getTempMode().breaksPipeline() || - getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) { - return FOUND_SOURCE_AND_DAM; - } else { - return FOUND_SOURCE; - } - } - else { - SourceAndDamReport res2 = this.input2.getSource().hasDamOnPathDownTo(source); - if (res2 == FOUND_SOURCE_AND_DAM) { - return FOUND_SOURCE_AND_DAM; - } - else if (res2 == FOUND_SOURCE) { - if (this.input2.getLocalStrategy().dams() || this.input2.getTempMode().breaksPipeline() || - getDriverStrategy().secondDam() == DamBehavior.FULL_DAM) { - return FOUND_SOURCE_AND_DAM; - } else { - return FOUND_SOURCE; - } - } - else { - // NOT_FOUND - // check the broadcast inputs - - for (NamedChannel nc : getBroadcastInputs()) { - SourceAndDamReport bcRes = nc.getSource().hasDamOnPathDownTo(source); - if (bcRes != NOT_FOUND) { - // broadcast inputs are always dams - return FOUND_SOURCE_AND_DAM; - } - } - return NOT_FOUND; - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java deleted file mode 100644 index d146c83..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plan; - -/** - * A common interface for compiled Flink plans for both batch and streaming - * processing programs. - * - */ -public interface FlinkPlan { - -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java deleted file mode 100644 index 38f76b2..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plan; - -import org.apache.flink.optimizer.dag.IterationNode; -import org.apache.flink.util.Visitor; - -/** - * - */ -public interface IterationPlanNode { - - void acceptForStepFunction(Visitor<PlanNode> visitor); - - IterationNode getIterationNode(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java deleted file mode 100644 index 3650eea..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.plan; - -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.BinaryUnionNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.util.IterableIterator; -import org.apache.flink.util.Visitor; - -/** - * A union operation over multiple inputs (2 or more). - */ -public class NAryUnionPlanNode extends PlanNode { - - private final List<Channel> inputs; - - /** - * @param template - */ - public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> inputs, GlobalProperties gProps, - Costs cumulativeCosts) - { - super(template, "Union", DriverStrategy.NONE); - - this.inputs = inputs; - this.globalProps = gProps; - this.localProps = new LocalProperties(); - this.nodeCosts = new Costs(); - this.cumulativeCosts = cumulativeCosts; - } - - @Override - public void accept(Visitor<PlanNode> visitor) { - visitor.preVisit(this); - for (Channel c : this.inputs) { - c.getSource().accept(visitor); - } - visitor.postVisit(this); - } - - public List<Channel> getListOfInputs() { - return this.inputs; - } - - @Override - public Iterable<Channel> getInputs() { - return Collections.unmodifiableList(this.inputs); - } - - @Override - public Iterable<PlanNode> getPredecessors() { - final Iterator<Channel> channels = this.inputs.iterator(); - return new IterableIterator<PlanNode>() { - - @Override - public boolean hasNext() { - return channels.hasNext(); - } - - @Override - public PlanNode next() { - return channels.next().getSource(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator<PlanNode> iterator() { - return this; - } - }; - } - - @Override - public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { - // this node is used after the plan enumeration. consequently, this will never be invoked here - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java deleted file mode 100644 index da97e61..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.plan; - -import org.apache.flink.optimizer.dag.TempMode; - -public class NamedChannel extends Channel { - - private final String name; - - /** - * Initializes NamedChannel. - * - * @param sourceNode - */ - public NamedChannel(String name, PlanNode sourceNode) { - super(sourceNode); - this.name = name; - } - - public NamedChannel(String name, PlanNode sourceNode, TempMode tempMode) { - super(sourceNode, tempMode); - this.name = name; - } - - public String getName() { - return this.name; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java deleted file mode 100644 index d56be87..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plan; - -import java.util.Collection; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.util.Visitable; -import org.apache.flink.util.Visitor; - -/** - * The execution plan generated by the Optimizer. It contains {@link PlanNode}s - * and {@link Channel}s that describe exactly how the program should be executed. - * It defines all ship strategies (local pipe, shuffle, broadcast, rebalance), all - * operator strategies (sorting-merge join, hash join, sorted grouping, ...), - * and the data exchange modes (batched, pipelined). - */ -public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode> { - - /** The data sources in the plan. */ - private final Collection<SourcePlanNode> dataSources; - - /** The data sinks in the plan. */ - private final Collection<SinkPlanNode> dataSinks; - - /** All nodes in the optimizer plan. */ - private final Collection<PlanNode> allNodes; - - /** The original program. */ - private final Plan originalProgram; - - /** Name of the job */ - private final String jobName; - - /** - * Creates a new instance of this optimizer plan container. The plan is given and fully - * described by the data sources, sinks and the collection of all nodes. - * - * @param sources The data sources. - * @param sinks The data sinks. - * @param allNodes A collection containing all nodes in the plan. - * @param jobName The name of the program - */ - public OptimizedPlan(Collection<SourcePlanNode> sources, Collection<SinkPlanNode> sinks, - Collection<PlanNode> allNodes, String jobName, Plan programPlan) - { - this.dataSources = sources; - this.dataSinks = sinks; - this.allNodes = allNodes; - this.jobName = jobName; - this.originalProgram = programPlan; - } - - /** - * Gets the data sources from this OptimizedPlan. - * - * @return The data sources. - */ - public Collection<SourcePlanNode> getDataSources() { - return dataSources; - } - - /** - * Gets the data sinks from this OptimizedPlan. - * - * @return The data sinks. - */ - public Collection<SinkPlanNode> getDataSinks() { - return dataSinks; - } - - /** - * Gets all the nodes from this OptimizedPlan. - * - * @return All nodes. - */ - public Collection<PlanNode> getAllNodes() { - return allNodes; - } - - /** - * Returns the name of the program. - * - * @return The name of the program. - */ - public String getJobName() { - return this.jobName; - } - - /** - * Gets the original program plan from which this optimized plan was created. - * - * @return The original program plan. - */ - public Plan getOriginalPactPlan() { - return this.originalProgram; - } - - // ------------------------------------------------------------------------ - - /** - * Applies the given visitor top down to all nodes, starting at the sinks. - * - * @param visitor - * The visitor to apply to the nodes in this plan. - * @see org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor) - */ - @Override - public void accept(Visitor<PlanNode> visitor) { - for (SinkPlanNode node : this.dataSinks) { - node.accept(visitor); - } - } -}