http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java deleted file mode 100644 index b3c083a..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.GroupReduceNode; -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.io.network.DataExchangeMode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; - -public final class AllGroupWithPartialPreGroupProperties extends OperatorDescriptorSingle { - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.ALL_GROUP_REDUCE; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - if (in.getShipStrategy() == ShipStrategyType.FORWARD) { - // locally connected, directly instantiate - return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", - in, DriverStrategy.ALL_GROUP_REDUCE); - } else { - // non forward case.plug in a combiner - Channel toCombiner = new Channel(in.getSource()); - toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - - // create an input node for combine with same DOP as input node - GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); - combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); - - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, - "Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_REDUCE_COMBINE); - combiner.setCosts(new Costs(0, 0)); - combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); - - Channel toReducer = new Channel(combiner); - toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), - in.getShipStrategySortOrder(), in.getDataExchangeMode()); - - toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", - toReducer, DriverStrategy.ALL_GROUP_REDUCE); - } - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - return Collections.singletonList(new RequestedGlobalProperties()); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) - { - gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gProps.clearUniqueFieldCombinations(); - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps.clearUniqueFieldSets(); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java deleted file mode 100644 index a172a60..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.ReduceNode; -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.io.network.DataExchangeMode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; - -public final class AllReduceProperties extends OperatorDescriptorSingle { - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.ALL_REDUCE; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - if (in.getShipStrategy() == ShipStrategyType.FORWARD) { - // locally connected, directly instantiate - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", - in, DriverStrategy.ALL_REDUCE); - } else { - // non forward case.plug in a combiner - Channel toCombiner = new Channel(in.getSource()); - toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - - // create an input node for combine with same DOP as input node - ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); - combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); - - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, - "Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE); - combiner.setCosts(new Costs(0, 0)); - combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); - - Channel toReducer = new Channel(combiner); - toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), - in.getShipStrategySortOrder(), in.getDataExchangeMode()); - toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(), - in.getLocalStrategySortOrder()); - - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", - toReducer, DriverStrategy.ALL_REDUCE); - } - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - return Collections.singletonList(new RequestedGlobalProperties()); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - return new GlobalProperties(); - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return new LocalProperties(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java deleted file mode 100644 index 8cc517e..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.dag.BinaryUnionNode; -import org.apache.flink.optimizer.dag.TwoInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.BinaryUnionPlanNode; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - -/** - * - */ -public class BinaryUnionOpDescriptor extends OperatorDescriptorDual { - - public BinaryUnionOpDescriptor() { - super(); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.UNION; - } - - @Override - protected List<GlobalPropertiesPair> createPossibleGlobalProperties() { - return Collections.emptyList(); - } - - @Override - protected List<LocalPropertiesPair> createPossibleLocalProperties() { - return Collections.emptyList(); - } - - @Override - public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - return new BinaryUnionPlanNode((BinaryUnionNode) node, in1, in2); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { - GlobalProperties newProps = new GlobalProperties(); - - if (in1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && - in2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && - in1.getPartitioningFields().equals(in2.getPartitioningFields())) - { - newProps.setHashPartitioned(in1.getPartitioningFields()); - } - - return newProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - // all local properties are destroyed - return new LocalProperties(); - } - - @Override - public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2) { - return true; - } - - @Override - public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, - GlobalProperties produced1, GlobalProperties produced2) { - return true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java deleted file mode 100644 index f48e297..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.dag.TwoInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; - - -public abstract class CartesianProductDescriptor extends OperatorDescriptorDual { - - private final boolean allowBroadcastFirst; - private final boolean allowBroadcastSecond; - - - protected CartesianProductDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) { - if (!(allowBroadcastFirst | allowBroadcastSecond)) { - throw new IllegalArgumentException(); - } - - this.allowBroadcastFirst = allowBroadcastFirst; - this.allowBroadcastSecond = allowBroadcastSecond; - } - - - @Override - protected List<GlobalPropertiesPair> createPossibleGlobalProperties() { - ArrayList<GlobalPropertiesPair> pairs = new ArrayList<GlobalPropertiesPair>(); - - if (this.allowBroadcastFirst) { - // replicate first - RequestedGlobalProperties replicated1 = new RequestedGlobalProperties(); - replicated1.setFullyReplicated(); - RequestedGlobalProperties any2 = new RequestedGlobalProperties(); - pairs.add(new GlobalPropertiesPair(replicated1, any2)); - } - - if (this.allowBroadcastSecond) { - // replicate second - RequestedGlobalProperties any1 = new RequestedGlobalProperties(); - RequestedGlobalProperties replicated2 = new RequestedGlobalProperties(); - replicated2.setFullyReplicated(); - pairs.add(new GlobalPropertiesPair(any1, replicated2)); - } - - return pairs; - } - - @Override - protected List<LocalPropertiesPair> createPossibleLocalProperties() { - // all properties are possible - return Collections.singletonList(new LocalPropertiesPair( - new RequestedLocalProperties(), new RequestedLocalProperties())); - } - - @Override - public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, - GlobalProperties produced1, GlobalProperties produced2) { - return true; - } - - @Override - public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2) { - return true; - } - - @Override - public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - return new DualInputPlanNode(node, "Cross("+node.getOperator().getName()+")", in1, in2, getStrategy()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { - GlobalProperties gp = GlobalProperties.combine(in1, in2); - if (gp.getUniqueFieldCombination() != null && gp.getUniqueFieldCombination().size() > 0 && - gp.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) - { - gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gp.clearUniqueFieldCombinations(); - return gp; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java deleted file mode 100644 index 368944e..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.TwoInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.util.Utils; -import org.apache.flink.runtime.operators.DriverStrategy; - -public class CoGroupDescriptor extends OperatorDescriptorDual { - - private final Ordering ordering1; // ordering on the first input - private final Ordering ordering2; // ordering on the second input - - private Partitioner<?> customPartitioner; - - - public CoGroupDescriptor(FieldList keys1, FieldList keys2) { - this(keys1, keys2, null, null); - } - - public CoGroupDescriptor(FieldList keys1, FieldList keys2, Ordering additionalOrdering1, Ordering additionalOrdering2) { - super(keys1, keys2); - - // if we have an additional ordering, construct the ordering to have primarily the grouping fields - if (additionalOrdering1 != null) { - this.ordering1 = new Ordering(); - for (Integer key : this.keys1) { - this.ordering1.appendOrdering(key, null, Order.ANY); - } - - // and next the additional order fields - for (int i = 0; i < additionalOrdering1.getNumberOfFields(); i++) { - Integer field = additionalOrdering1.getFieldNumber(i); - Order order = additionalOrdering1.getOrder(i); - this.ordering1.appendOrdering(field, additionalOrdering1.getType(i), order); - } - } else { - this.ordering1 = Utils.createOrdering(this.keys1); - } - - // if we have an additional ordering, construct the ordering to have primarily the grouping fields - if (additionalOrdering2 != null) { - this.ordering2 = new Ordering(); - for (Integer key : this.keys2) { - this.ordering2.appendOrdering(key, null, Order.ANY); - } - - // and next the additional order fields - for (int i = 0; i < additionalOrdering2.getNumberOfFields(); i++) { - Integer field = additionalOrdering2.getFieldNumber(i); - Order order = additionalOrdering2.getOrder(i); - this.ordering2.appendOrdering(field, additionalOrdering2.getType(i), order); - } - } else { - this.ordering2 = Utils.createOrdering(this.keys2); - } - } - - public void setCustomPartitioner(Partitioner<?> customPartitioner) { - this.customPartitioner = customPartitioner; - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.CO_GROUP; - } - - @Override - protected List<GlobalPropertiesPair> createPossibleGlobalProperties() { - - if (this.customPartitioner == null) { - - // we accept compatible partitionings of any type - RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties(); - RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties(); - partitioned_left_any.setAnyPartitioning(this.keys1); - partitioned_right_any.setAnyPartitioning(this.keys2); - - // add strict hash partitioning of both inputs on their full key sets - RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties(); - RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties(); - partitioned_left_hash.setHashPartitioned(this.keys1); - partitioned_right_hash.setHashPartitioned(this.keys2); - - return Arrays.asList(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any), - new GlobalPropertiesPair(partitioned_left_hash, partitioned_right_hash)); - } - else { - RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties(); - partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner); - - RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties(); - partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner); - - return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right)); - } - } - - @Override - protected List<LocalPropertiesPair> createPossibleLocalProperties() { - RequestedLocalProperties sort1 = new RequestedLocalProperties(this.ordering1); - RequestedLocalProperties sort2 = new RequestedLocalProperties(this.ordering2); - return Collections.singletonList(new LocalPropertiesPair(sort1, sort2)); - } - - @Override - public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, - GlobalProperties produced1, GlobalProperties produced2) - { - - if(produced1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && - produced2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) { - - // both are hash partitioned, check that partitioning fields are equivalently chosen - return checkEquivalentFieldPositionsInKeyFields( - produced1.getPartitioningFields(), produced2.getPartitioningFields()); - - } - else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && - produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) { - - // both are range partitioned, check that partitioning fields are equivalently chosen - return checkEquivalentFieldPositionsInKeyFields( - produced1.getPartitioningFields(), produced2.getPartitioningFields()); - - } - else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING && - produced2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING) { - - // both use a custom partitioner. Check that both keys are exactly as specified and that both the same partitioner - return produced1.getPartitioningFields().isExactMatch(this.keys1) && - produced2.getPartitioningFields().isExactMatch(this.keys2) && - produced1.getCustomPartitioner() != null && produced2.getCustomPartitioner() != null && - produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()); - - } - else { - - // no other partitioning valid, incl. ANY_PARTITIONING. - // For co-groups we must ensure that both sides are exactly identically partitioned, ANY is not good enough. - return false; - } - - } - - @Override - public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2) - { - int numRelevantFields = this.keys1.size(); - - Ordering prod1 = produced1.getOrdering(); - Ordering prod2 = produced2.getOrdering(); - - if (prod1 == null || prod2 == null) { - throw new CompilerException("The given properties do not meet this operators requirements."); - } - - // check that order of fields is equivalent - if (!checkEquivalentFieldPositionsInKeyFields( - prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) { - return false; - } - - // check that order directions are equivalent - for (int i = 0; i < numRelevantFields; i++) { - if (prod1.getOrder(i) != prod2.getOrder(i)) { - return false; - } - } - return true; - } - - @Override - public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - boolean[] inputOrders = in1.getLocalProperties().getOrdering() == null ? null : in1.getLocalProperties().getOrdering().getFieldSortDirections(); - - if (inputOrders == null || inputOrders.length < this.keys1.size()) { - throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator."); - } else if (inputOrders.length > this.keys1.size()) { - boolean[] tmp = new boolean[this.keys1.size()]; - System.arraycopy(inputOrders, 0, tmp, 0, tmp.length); - inputOrders = tmp; - } - - return new DualInputPlanNode(node, "CoGroup ("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { - GlobalProperties gp = GlobalProperties.combine(in1, in2); - if (gp.getUniqueFieldCombination() != null && gp.getUniqueFieldCombination().size() > 0 && - gp.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) - { - gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gp.clearUniqueFieldCombinations(); - return gp; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - LocalProperties comb = LocalProperties.combine(in1, in2); - return comb.clearUniqueFieldSets(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java deleted file mode 100644 index 8e7edeb..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.TwoInputNode; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.util.Utils; -import org.apache.flink.runtime.operators.DriverStrategy; - -/** - * - */ -public class CoGroupWithSolutionSetFirstDescriptor extends CoGroupDescriptor { - - public CoGroupWithSolutionSetFirstDescriptor(FieldList keys1, FieldList keys2) { - super(keys1, keys2); - } - - @Override - protected List<LocalPropertiesPair> createPossibleLocalProperties() { - RequestedLocalProperties none = new RequestedLocalProperties(); - RequestedLocalProperties sort = new RequestedLocalProperties(Utils.createOrdering(this.keys2)); - return Collections.singletonList(new LocalPropertiesPair(none, sort)); - } - - @Override - public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - boolean[] inputOrders = in2.getLocalProperties().getOrdering() == null ? null : in2.getLocalProperties().getOrdering().getFieldSortDirections(); - - if (inputOrders == null || inputOrders.length < this.keys2.size()) { - throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator."); - } else if (inputOrders.length > this.keys2.size()) { - boolean[] tmp = new boolean[this.keys2.size()]; - System.arraycopy(inputOrders, 0, tmp, 0, tmp.length); - inputOrders = tmp; - } - - return new DualInputPlanNode(node, "CoGroup ("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders); - } - - @Override - public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2) - { - return true; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - return in2; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetSecondDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetSecondDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetSecondDescriptor.java deleted file mode 100644 index 016eb79..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetSecondDescriptor.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.util.Utils; - -/** - * - */ -public class CoGroupWithSolutionSetSecondDescriptor extends CoGroupDescriptor { - - public CoGroupWithSolutionSetSecondDescriptor(FieldList keys1, FieldList keys2) { - super(keys1, keys2); - } - - @Override - protected List<LocalPropertiesPair> createPossibleLocalProperties() { - RequestedLocalProperties sort = new RequestedLocalProperties(Utils.createOrdering(this.keys1)); - RequestedLocalProperties none = new RequestedLocalProperties(); - return Collections.singletonList(new LocalPropertiesPair(sort, none)); - } - - @Override - public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2) - { - return true; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - return in1; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java deleted file mode 100644 index bcd4d73..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class CollectorMapDescriptor extends OperatorDescriptorSingle { - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.COLLECTOR_MAP; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.COLLECTOR_MAP); - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setAnyDistribution(); - return Collections.singletonList(rgp); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) - { - gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gProps.clearUniqueFieldCombinations(); - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps.clearUniqueFieldSets(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterFirstDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterFirstDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterFirstDescriptor.java deleted file mode 100644 index 4a467dc..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterFirstDescriptor.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class CrossBlockOuterFirstDescriptor extends CartesianProductDescriptor { - - public CrossBlockOuterFirstDescriptor() { - this(true, true); - } - - public CrossBlockOuterFirstDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) { - super(allowBroadcastFirst, allowBroadcastSecond); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - return new LocalProperties(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterSecondDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterSecondDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterSecondDescriptor.java deleted file mode 100644 index 212e048..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterSecondDescriptor.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class CrossBlockOuterSecondDescriptor extends CartesianProductDescriptor { - - public CrossBlockOuterSecondDescriptor() { - this(true, true); - } - - public CrossBlockOuterSecondDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) { - super(allowBroadcastFirst, allowBroadcastSecond); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - return new LocalProperties(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterFirstDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterFirstDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterFirstDescriptor.java deleted file mode 100644 index 9c7bd63..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterFirstDescriptor.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class CrossStreamOuterFirstDescriptor extends CartesianProductDescriptor { - - public CrossStreamOuterFirstDescriptor() { - this(true, true); - } - - public CrossStreamOuterFirstDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) { - super(allowBroadcastFirst, allowBroadcastSecond); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - // uniqueness becomes grouping with streamed nested loops - if ((in1.getGroupedFields() == null || in1.getGroupedFields().size() == 0) && - in1.getUniqueFields() != null && in1.getUniqueFields().size() > 0) - { - return LocalProperties.forGrouping(in1.getUniqueFields().iterator().next().toFieldList()); - } else { - return in1.clearUniqueFieldSets(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java deleted file mode 100644 index 3fabad6..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class CrossStreamOuterSecondDescriptor extends CartesianProductDescriptor { - - public CrossStreamOuterSecondDescriptor() { - this(true, true); - } - - public CrossStreamOuterSecondDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) { - super(allowBroadcastFirst, allowBroadcastSecond); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - // uniqueness becomes grouping with streamed nested loops - if ((in2.getGroupedFields() == null || in2.getGroupedFields().size() == 0) && - in2.getUniqueFields() != null && in2.getUniqueFields().size() > 0) - { - return LocalProperties.forGrouping(in2.getUniqueFields().iterator().next().toFieldList()); - } else { - return in2.clearUniqueFieldSets(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java deleted file mode 100644 index 81c823f..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class FilterDescriptor extends OperatorDescriptorSingle { - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.FLAT_MAP; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "Filter ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP); - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setAnyDistribution(); - return Collections.singletonList(rgp); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java deleted file mode 100644 index b915e45..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class FlatMapDescriptor extends OperatorDescriptorSingle { - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.FLAT_MAP; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "FlatMap ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP); - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setAnyDistribution(); - return Collections.singletonList(rgp); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) - { - gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gProps.clearUniqueFieldCombinations(); - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps.clearUniqueFieldSets(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java deleted file mode 100644 index b648386..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - -import java.util.Collections; -import java.util.List; - -/** - * The properties file belonging to the GroupCombineNode. It translates the GroupCombine operation - * to the driver strategy SORTED_GROUP_COMBINE and sets the relevant grouping and sorting keys. - * @see org.apache.flink.optimizer.dag.GroupCombineNode - */ -public final class GroupCombineProperties extends OperatorDescriptorSingle { - - private final Ordering ordering; // ordering that we need to use if an additional ordering is requested - - public GroupCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys) { - super(groupKeys); - - // if we have an additional ordering, construct the ordering to have primarily the grouping fields - - this.ordering = new Ordering(); - for (Integer key : this.keyList) { - this.ordering.appendOrdering(key, null, Order.ANY); - } - - // and next the additional order fields - if (additionalOrderKeys != null) { - for (int i = 0; i < additionalOrderKeys.getNumberOfFields(); i++) { - Integer field = additionalOrderKeys.getFieldNumber(i); - Order order = additionalOrderKeys.getOrder(i); - this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order); - } - } - - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.SORTED_GROUP_COMBINE; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - node.setDegreeOfParallelism(in.getSource().getParallelism()); - - // sorting key info - SingleInputPlanNode singleInputPlanNode = new SingleInputPlanNode( - node, - "GroupCombine (" + node.getOperator().getName() + ")", - in, // reuse the combine strategy also used in the group reduce - DriverStrategy.SORTED_GROUP_COMBINE, this.keyList); - - // set sorting comparator key info - singleInputPlanNode.setDriverKeyInfo(this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections(), 0); - // set grouping comparator key info - singleInputPlanNode.setDriverKeyInfo(this.keyList, 1); - - return singleInputPlanNode; - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties props = new RequestedGlobalProperties(); - props.setRandomPartitioning(); - return Collections.singletonList(props); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { - gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gProps.clearUniqueFieldCombinations(); - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps.clearUniqueFieldSets(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java deleted file mode 100644 index ebd09f2..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - -public final class GroupReduceProperties extends OperatorDescriptorSingle { - - private final Ordering ordering; // ordering that we need to use if an additional ordering is requested - - private final Partitioner<?> customPartitioner; - - - public GroupReduceProperties(FieldSet keys) { - this(keys, null, null); - } - - public GroupReduceProperties(FieldSet keys, Ordering additionalOrderKeys) { - this(keys, additionalOrderKeys, null); - } - - public GroupReduceProperties(FieldSet keys, Partitioner<?> customPartitioner) { - this(keys, null, customPartitioner); - } - - public GroupReduceProperties(FieldSet groupKeys, Ordering additionalOrderKeys, Partitioner<?> customPartitioner) { - super(groupKeys); - - // if we have an additional ordering, construct the ordering to have primarily the grouping fields - if (additionalOrderKeys != null) { - this.ordering = new Ordering(); - for (Integer key : this.keyList) { - this.ordering.appendOrdering(key, null, Order.ANY); - } - - // and next the additional order fields - for (int i = 0; i < additionalOrderKeys.getNumberOfFields(); i++) { - Integer field = additionalOrderKeys.getFieldNumber(i); - Order order = additionalOrderKeys.getOrder(i); - this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order); - } - } - else { - this.ordering = null; - } - - this.customPartitioner = customPartitioner; - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.SORTED_GROUP_REDUCE; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties props = new RequestedGlobalProperties(); - - if (customPartitioner == null) { - props.setAnyPartitioning(this.keys); - } else { - props.setCustomPartitioned(this.keys, this.customPartitioner); - } - return Collections.singletonList(props); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - RequestedLocalProperties props = new RequestedLocalProperties(); - if (this.ordering == null) { - props.setGroupedFields(this.keys); - } else { - props.setOrdering(this.ordering); - } - return Collections.singletonList(props); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) - { - gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gProps.clearUniqueFieldCombinations(); - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps.clearUniqueFieldSets(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java deleted file mode 100644 index c4f47d3..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.optimizer.costs.Costs; -import org.apache.flink.optimizer.dag.GroupReduceNode; -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.PartitioningProperty; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.io.network.DataExchangeMode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; - -public final class GroupReduceWithCombineProperties extends OperatorDescriptorSingle { - - private final Ordering ordering; // ordering that we need to use if an additional ordering is requested - - private final Partitioner<?> customPartitioner; - - - public GroupReduceWithCombineProperties(FieldSet groupKeys) { - this(groupKeys, null, null); - } - - public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys) { - this(groupKeys, additionalOrderKeys, null); - } - - public GroupReduceWithCombineProperties(FieldSet groupKeys, Partitioner<?> customPartitioner) { - this(groupKeys, null, customPartitioner); - } - - public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys, Partitioner<?> customPartitioner) { - super(groupKeys); - - // if we have an additional ordering, construct the ordering to have primarily the grouping fields - if (additionalOrderKeys != null) { - this.ordering = new Ordering(); - for (Integer key : this.keyList) { - this.ordering.appendOrdering(key, null, Order.ANY); - } - - // and next the additional order fields - for (int i = 0; i < additionalOrderKeys.getNumberOfFields(); i++) { - Integer field = additionalOrderKeys.getFieldNumber(i); - Order order = additionalOrderKeys.getOrder(i); - this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order); - } - } else { - this.ordering = null; - } - - this.customPartitioner = customPartitioner; - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.SORTED_GROUP_REDUCE; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - if (in.getShipStrategy() == ShipStrategyType.FORWARD) { - // adjust a sort (changes grouping, so it must be for this driver to combining sort - if (in.getLocalStrategy() == LocalStrategy.SORT) { - if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { - throw new RuntimeException("Bug: Inconsistent sort for group strategy."); - } - in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), - in.getLocalStrategySortOrder()); - } - return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in, - DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); - } else { - // non forward case. all local properties are killed anyways, so we can safely plug in a combiner - Channel toCombiner = new Channel(in.getSource()); - toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - - // create an input node for combine with same DOP as input node - GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); - combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); - - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() - .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); - combiner.setCosts(new Costs(0, 0)); - combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); - // set sorting comparator key info - combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); - // set grouping comparator key info - combiner.setDriverKeyInfo(this.keyList, 1); - - Channel toReducer = new Channel(combiner); - toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), - in.getShipStrategySortOrder(), in.getDataExchangeMode()); - toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), - in.getLocalStrategySortOrder()); - - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", - toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); - } - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties props = new RequestedGlobalProperties(); - if (customPartitioner == null) { - props.setAnyPartitioning(this.keys); - } else { - props.setCustomPartitioned(this.keys, this.customPartitioner); - } - return Collections.singletonList(props); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - RequestedLocalProperties props = new RequestedLocalProperties(); - if (this.ordering == null) { - props.setGroupedFields(this.keys); - } else { - props.setOrdering(this.ordering); - } - return Collections.singletonList(props); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && - gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) - { - gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); - } - gProps.clearUniqueFieldCombinations(); - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps.clearUniqueFieldSets(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java deleted file mode 100644 index fec72a9..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.TwoInputNode; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - -/** - * - */ -public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor { - - public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2) { - super(keys1, keys2); - } - - public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2, - boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) - { - super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.HYBRIDHASH_BUILD_FIRST; - } - - @Override - protected List<LocalPropertiesPair> createPossibleLocalProperties() { - // all properties are possible - return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties())); - } - - @Override - public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2) - { - return true; - } - - @Override - public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - DriverStrategy strategy; - - if(!in1.isOnDynamicPath() && in2.isOnDynamicPath()) { - // sanity check that the first input is cached and remove that cache - if (!in1.getTempMode().isCached()) { - throw new CompilerException("No cache at point where static and dynamic parts meet."); - } - in1.setTempMode(in1.getTempMode().makeNonCached()); - strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED; - } - else { - strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST; - } - return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2); - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - return new LocalProperties(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java deleted file mode 100644 index f9d1e6c..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.TwoInputNode; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - -public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor { - - public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2) { - super(keys1, keys2); - } - - public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2, - boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) - { - super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed); - } - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.HYBRIDHASH_BUILD_SECOND; - } - - @Override - protected List<LocalPropertiesPair> createPossibleLocalProperties() { - // all properties are possible - return Collections.singletonList(new LocalPropertiesPair( - new RequestedLocalProperties(), new RequestedLocalProperties())); - } - - @Override - public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, - LocalProperties produced1, LocalProperties produced2) - { - return true; - } - - @Override - public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - DriverStrategy strategy; - - if (!in2.isOnDynamicPath() && in1.isOnDynamicPath()) { - // sanity check that the first input is cached and remove that cache - if (!in2.getTempMode().isCached()) { - throw new CompilerException("No cache at point where static and dynamic parts meet."); - } - - in2.setTempMode(in2.getTempMode().makeNonCached()); - strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED; - } - else { - strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND; - } - return new DualInputPlanNode(node, "Join ("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2); - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { - return new LocalProperties(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java deleted file mode 100644 index 9f14d2a..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class MapDescriptor extends OperatorDescriptorSingle { - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.MAP; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.MAP); - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setAnyDistribution(); - return Collections.singletonList(rgp); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java deleted file mode 100644 index 1489097..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.operators; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.optimizer.dag.SingleInputNode; -import org.apache.flink.optimizer.dataproperties.GlobalProperties; -import org.apache.flink.optimizer.dataproperties.LocalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; -import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; - - -public class MapPartitionDescriptor extends OperatorDescriptorSingle { - - @Override - public DriverStrategy getStrategy() { - return DriverStrategy.MAP_PARTITION; - } - - @Override - public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "MapPartition ("+node.getOperator().getName()+")", in, DriverStrategy.MAP_PARTITION); - } - - @Override - protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { - RequestedGlobalProperties rgp = new RequestedGlobalProperties(); - rgp.setAnyDistribution(); - return Collections.singletonList(rgp); - } - - @Override - protected List<RequestedLocalProperties> createPossibleLocalProperties() { - return Collections.singletonList(new RequestedLocalProperties()); - } - - @Override - public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { - return gProps; - } - - @Override - public LocalProperties computeLocalProperties(LocalProperties lProps) { - return lProps; - } -}