http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plantranslate/NepheleJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plantranslate/NepheleJobGraphGenerator.java deleted file mode 100644 index ed6f5ca..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plantranslate/NepheleJobGraphGenerator.java +++ /dev/null @@ -1,1581 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.optimizer.plantranslate; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.aggregators.AggregatorRegistry; -import org.apache.flink.api.common.aggregators.AggregatorWithName; -import org.apache.flink.api.common.aggregators.ConvergenceCriterion; -import org.apache.flink.api.common.aggregators.LongSumAggregator; -import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; -import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.TempMode; -import org.apache.flink.optimizer.plan.BulkIterationPlanNode; -import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.IterationPlanNode; -import org.apache.flink.optimizer.plan.NAryUnionPlanNode; -import org.apache.flink.optimizer.plan.NamedChannel; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plan.SolutionSetPlanNode; -import org.apache.flink.optimizer.plan.SourcePlanNode; -import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; -import org.apache.flink.optimizer.plan.WorksetPlanNode; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.runtime.io.network.DataExchangeMode; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion; -import org.apache.flink.runtime.iterative.task.IterationHeadPactTask; -import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask; -import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask; -import org.apache.flink.runtime.iterative.task.IterationTailPactTask; -import org.apache.flink.runtime.jobgraph.AbstractJobVertex; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.InputFormatVertex; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.OutputFormatVertex; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.operators.CoGroupDriver; -import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver; -import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver; -import org.apache.flink.runtime.operators.DataSinkTask; -import org.apache.flink.runtime.operators.DataSourceTask; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver; -import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver; -import org.apache.flink.runtime.operators.MatchDriver; -import org.apache.flink.runtime.operators.NoOpDriver; -import org.apache.flink.runtime.operators.RegularPactTask; -import org.apache.flink.runtime.operators.chaining.ChainedDriver; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Visitor; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -/** - * This component translates the optimizer's resulting plan a nephele job graph. The - * translation is a one to one mapping. All decisions are made by the optimizer, this class - * simply creates nephele data structures and descriptions corresponding to the optimizer's - * result. - * <p> - * The basic method of operation is a top down traversal over the plan graph. On the way down, tasks are created - * for the plan nodes, on the way back up, the nodes connect their predecessor. - */ -public class NepheleJobGraphGenerator implements Visitor<PlanNode> { - - public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux"; - - private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false); - -// private static final Logger LOG = LoggerFactory.getLogger(NepheleJobGraphGenerator.class); - - private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null); - - // ------------------------------------------------------------------------ - - private Map<PlanNode, AbstractJobVertex> vertices; // a map from optimizer nodes to nephele vertices - - private Map<PlanNode, TaskInChain> chainedTasks; // a map from optimizer nodes to nephele vertices - - private Map<IterationPlanNode, IterationDescriptor> iterations; - - private List<TaskInChain> chainedTasksInSequence; - - private List<AbstractJobVertex> auxVertices; // auxiliary vertices which are added during job graph generation - - private final int defaultMaxFan; - - private final float defaultSortSpillingThreshold; - - private int iterationIdEnumerator = 1; - - private IterationPlanNode currentIteration; // the current the enclosing iteration - - private List<IterationPlanNode> iterationStack; // stack of enclosing iterations - - private SlotSharingGroup sharingGroup; - - // ------------------------------------------------------------------------ - - /** - * Creates a new job graph generator that uses the default values for its resource configuration. - */ - public NepheleJobGraphGenerator() { - this.defaultMaxFan = ConfigConstants.DEFAULT_SPILLING_MAX_FAN; - this.defaultSortSpillingThreshold = ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD; - } - - public NepheleJobGraphGenerator(Configuration config) { - this.defaultMaxFan = config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY, - ConfigConstants.DEFAULT_SPILLING_MAX_FAN); - this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY, - ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD); - } - - /** - * Translates a {@link org.apache.flink.optimizer.plan.OptimizedPlan} into a - * {@link org.apache.flink.runtime.jobgraph.JobGraph}. - * This is an 1-to-1 mapping. No optimization whatsoever is applied. - * - * @param program - * Optimized PACT plan that is translated into a JobGraph. - * @return JobGraph generated from PACT plan. - */ - public JobGraph compileJobGraph(OptimizedPlan program) { - this.vertices = new HashMap<PlanNode, AbstractJobVertex>(); - this.chainedTasks = new HashMap<PlanNode, TaskInChain>(); - this.chainedTasksInSequence = new ArrayList<TaskInChain>(); - this.auxVertices = new ArrayList<AbstractJobVertex>(); - this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>(); - this.iterationStack = new ArrayList<IterationPlanNode>(); - - this.sharingGroup = new SlotSharingGroup(); - - // generate Nephele job graph - program.accept(this); - - // sanity check that we are not somehow in an iteration at the end - if (this.currentIteration != null) { - throw new CompilerException("The graph translation ended prematurely, leaving an unclosed iteration."); - } - - // finalize the iterations - for (IterationDescriptor iteration : this.iterations.values()) { - if (iteration.getIterationNode() instanceof BulkIterationPlanNode) { - finalizeBulkIteration(iteration); - } else if (iteration.getIterationNode() instanceof WorksetIterationPlanNode) { - finalizeWorksetIteration(iteration); - } else { - throw new CompilerException(); - } - } - - // now that the traversal is done, we have the chained tasks write their configs into their - // parents' configurations - for (int i = 0; i < this.chainedTasksInSequence.size(); i++) { - TaskInChain tic = this.chainedTasksInSequence.get(i); - TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration()); - t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName()); - } - - // create the jobgraph object - JobGraph graph = new JobGraph(program.getJobName()); - graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries()); - graph.setAllowQueuedScheduling(false); - - // add vertices to the graph - for (AbstractJobVertex vertex : this.vertices.values()) { - graph.addVertex(vertex); - } - - for (AbstractJobVertex vertex : this.auxVertices) { - graph.addVertex(vertex); - vertex.setSlotSharingGroup(sharingGroup); - } - - // add registered cache file into job configuration - for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) { - DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration()); - } - - try { - InstantiationUtil.writeObjectToConfig( - program.getOriginalPactPlan().getExecutionConfig(), - graph.getJobConfiguration(), - ExecutionConfig.CONFIG_KEY); - } catch (IOException e) { - throw new RuntimeException("Config object could not be written to Job Configuration: " + e); - } - - // release all references again - this.vertices = null; - this.chainedTasks = null; - this.chainedTasksInSequence = null; - this.auxVertices = null; - this.iterations = null; - this.iterationStack = null; - - // return job graph - return graph; - } - - /** - * This methods implements the pre-visiting during a depth-first traversal. It create the job vertex and - * sets local strategy. - * - * @param node - * The node that is currently processed. - * @return True, if the visitor should descend to the node's children, false if not. - * @see org.apache.flink.util.Visitor#preVisit(org.apache.flink.util.Visitable) - */ - @Override - public boolean preVisit(PlanNode node) { - // check if we have visited this node before. in non-tree graphs, this happens - if (this.vertices.containsKey(node) || this.chainedTasks.containsKey(node) || this.iterations.containsKey(node)) { - // return false to prevent further descend - return false; - } - - // the vertex to be created for the current node - final AbstractJobVertex vertex; - try { - if (node instanceof SinkPlanNode) { - vertex = createDataSinkVertex((SinkPlanNode) node); - } - else if (node instanceof SourcePlanNode) { - vertex = createDataSourceVertex((SourcePlanNode) node); - } - else if (node instanceof BulkIterationPlanNode) { - BulkIterationPlanNode iterationNode = (BulkIterationPlanNode) node; - // for the bulk iteration, we skip creating anything for now. we create the graph - // for the step function in the post visit. - - // check that the root of the step function has the same DOP as the iteration. - // because the tail must have the same DOP as the head, we can only merge the last - // operator with the tail, if they have the same DOP. not merging is currently not - // implemented - PlanNode root = iterationNode.getRootOfStepFunction(); - if (root.getDegreeOfParallelism() != node.getDegreeOfParallelism()) - { - throw new CompilerException("Error: The final operator of the step " + - "function has a different degree of parallelism than the iteration operator itself."); - } - - IterationDescriptor descr = new IterationDescriptor(iterationNode, this.iterationIdEnumerator++); - this.iterations.put(iterationNode, descr); - vertex = null; - } - else if (node instanceof WorksetIterationPlanNode) { - WorksetIterationPlanNode iterationNode = (WorksetIterationPlanNode) node; - - // we have the same constraints as for the bulk iteration - PlanNode nextWorkSet = iterationNode.getNextWorkSetPlanNode(); - PlanNode solutionSetDelta = iterationNode.getSolutionSetDeltaPlanNode(); - - if (nextWorkSet.getDegreeOfParallelism() != node.getDegreeOfParallelism()) - { - throw new CompilerException("It is currently not supported that the final operator of the step " + - "function has a different degree of parallelism than the iteration operator itself."); - } - if (solutionSetDelta.getDegreeOfParallelism() != node.getDegreeOfParallelism()) - { - throw new CompilerException("It is currently not supported that the final operator of the step " + - "function has a different degree of parallelism than the iteration operator itself."); - } - - IterationDescriptor descr = new IterationDescriptor(iterationNode, this.iterationIdEnumerator++); - this.iterations.put(iterationNode, descr); - vertex = null; - } - else if (node instanceof SingleInputPlanNode) { - vertex = createSingleInputVertex((SingleInputPlanNode) node); - } - else if (node instanceof DualInputPlanNode) { - vertex = createDualInputVertex((DualInputPlanNode) node); - } - else if (node instanceof NAryUnionPlanNode) { - // skip the union for now - vertex = null; - } - else if (node instanceof BulkPartialSolutionPlanNode) { - // create a head node (or not, if it is merged into its successor) - vertex = createBulkIterationHead((BulkPartialSolutionPlanNode) node); - } - else if (node instanceof SolutionSetPlanNode) { - // this represents an access into the solution set index. - // we do not create a vertex for the solution set here (we create the head at the workset place holder) - - // we adjust the joins / cogroups that go into the solution set here - for (Channel c : node.getOutgoingChannels()) { - DualInputPlanNode target = (DualInputPlanNode) c.getTarget(); - AbstractJobVertex accessingVertex = this.vertices.get(target); - TaskConfig conf = new TaskConfig(accessingVertex.getConfiguration()); - int inputNum = c == target.getInput1() ? 0 : c == target.getInput2() ? 1 : -1; - - // sanity checks - if (inputNum == -1) { - throw new CompilerException(); - } - - // adjust the driver - if (conf.getDriver().equals(MatchDriver.class)) { - conf.setDriver(inputNum == 0 ? JoinWithSolutionSetFirstDriver.class : JoinWithSolutionSetSecondDriver.class); - } - else if (conf.getDriver().equals(CoGroupDriver.class)) { - conf.setDriver(inputNum == 0 ? CoGroupWithSolutionSetFirstDriver.class : CoGroupWithSolutionSetSecondDriver.class); - } - else { - throw new CompilerException("Found join with solution set using incompatible operator (only Join/CoGroup are valid)."); - } - } - - // make sure we do not visit this node again. for that, we add a 'already seen' entry into one of the sets - this.chainedTasks.put(node, ALREADY_VISITED_PLACEHOLDER); - - vertex = null; - } - else if (node instanceof WorksetPlanNode) { - // create the iteration head here - vertex = createWorksetIterationHead((WorksetPlanNode) node); - } - else { - throw new CompilerException("Unrecognized node type: " + node.getClass().getName()); - } - } - catch (Exception e) { - throw new CompilerException("Error translating node '" + node + "': " + e.getMessage(), e); - } - - // check if a vertex was created, or if it was chained or skipped - if (vertex != null) { - // set degree of parallelism - int pd = node.getDegreeOfParallelism(); - vertex.setParallelism(pd); - - vertex.setSlotSharingGroup(sharingGroup); - - // check whether this vertex is part of an iteration step function - if (this.currentIteration != null) { - // check that the task has the same DOP as the iteration as such - PlanNode iterationNode = (PlanNode) this.currentIteration; - if (iterationNode.getDegreeOfParallelism() < pd) { - throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, degree-of-parallelism than the iteration operator."); - } - - // store the id of the iterations the step functions participate in - IterationDescriptor descr = this.iterations.get(this.currentIteration); - new TaskConfig(vertex.getConfiguration()).setIterationId(descr.getId()); - } - - // store in the map - this.vertices.put(node, vertex); - } - - // returning true causes deeper descend - return true; - } - - /** - * This method implements the post-visit during the depth-first traversal. When the post visit happens, - * all of the descendants have been processed, so this method connects all of the current node's - * predecessors to the current node. - * - * @param node - * The node currently processed during the post-visit. - * @see org.apache.flink.util.Visitor#postVisit(org.apache.flink.util.Visitable) t - */ - @Override - public void postVisit(PlanNode node) { - try { - // --------- check special cases for which we handle post visit differently ---------- - - // skip data source node (they have no inputs) - // also, do nothing for union nodes, we connect them later when gathering the inputs for a task - // solution sets have no input. the initial solution set input is connected when the iteration node is in its postVisit - if (node instanceof SourcePlanNode || node instanceof NAryUnionPlanNode || node instanceof SolutionSetPlanNode) { - return; - } - - // check if we have an iteration. in that case, translate the step function now - if (node instanceof IterationPlanNode) { - // prevent nested iterations - if (node.isOnDynamicPath()) { - throw new CompilerException("Nested Iterations are not possible at the moment!"); - } - - // if we recursively go into an iteration (because the constant path of one iteration contains - // another one), we push the current one onto the stack - if (this.currentIteration != null) { - this.iterationStack.add(this.currentIteration); - } - - this.currentIteration = (IterationPlanNode) node; - this.currentIteration.acceptForStepFunction(this); - - // pop the current iteration from the stack - if (this.iterationStack.isEmpty()) { - this.currentIteration = null; - } else { - this.currentIteration = this.iterationStack.remove(this.iterationStack.size() - 1); - } - - // inputs for initial bulk partial solution or initial workset are already connected to the iteration head in the head's post visit. - // connect the initial solution set now. - if (node instanceof WorksetIterationPlanNode) { - // connect the initial solution set - WorksetIterationPlanNode wsNode = (WorksetIterationPlanNode) node; - AbstractJobVertex headVertex = this.iterations.get(wsNode).getHeadTask(); - TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration()); - int inputIndex = headConfig.getDriverStrategy().getNumInputs(); - headConfig.setIterationHeadSolutionSetInputIndex(inputIndex); - translateChannel(wsNode.getInitialSolutionSetInput(), inputIndex, headVertex, headConfig, false); - } - - return; - } - - final AbstractJobVertex targetVertex = this.vertices.get(node); - - - // --------- Main Path: Translation of channels ---------- - // - // There are two paths of translation: One for chained tasks (or merged tasks in general), - // which do not have their own task vertex. The other for tasks that have their own vertex, - // or are the primary task in a vertex (to which the others are chained). - - // check whether this node has its own task, or is merged with another one - if (targetVertex == null) { - // node's task is merged with another task. it is either chained, of a merged head vertex - // from an iteration - final TaskInChain chainedTask; - if ((chainedTask = this.chainedTasks.get(node)) != null) { - // Chained Task. Sanity check first... - final Iterator<Channel> inConns = node.getInputs().iterator(); - if (!inConns.hasNext()) { - throw new CompilerException("Bug: Found chained task with no input."); - } - final Channel inConn = inConns.next(); - - if (inConns.hasNext()) { - throw new CompilerException("Bug: Found a chained task with more than one input!"); - } - if (inConn.getLocalStrategy() != null && inConn.getLocalStrategy() != LocalStrategy.NONE) { - throw new CompilerException("Bug: Found a chained task with an input local strategy."); - } - if (inConn.getShipStrategy() != null && inConn.getShipStrategy() != ShipStrategyType.FORWARD) { - throw new CompilerException("Bug: Found a chained task with an input ship strategy other than FORWARD."); - } - - AbstractJobVertex container = chainedTask.getContainingVertex(); - - if (container == null) { - final PlanNode sourceNode = inConn.getSource(); - container = this.vertices.get(sourceNode); - if (container == null) { - // predecessor is itself chained - container = this.chainedTasks.get(sourceNode).getContainingVertex(); - if (container == null) { - throw new IllegalStateException("Bug: Chained task predecessor has not been assigned its containing vertex."); - } - } else { - // predecessor is a proper task job vertex and this is the first chained task. add a forward connection entry. - new TaskConfig(container.getConfiguration()).addOutputShipStrategy(ShipStrategyType.FORWARD); - } - chainedTask.setContainingVertex(container); - } - - // add info about the input serializer type - chainedTask.getTaskConfig().setInputSerializer(inConn.getSerializer(), 0); - - // update name of container task - String containerTaskName = container.getName(); - if(containerTaskName.startsWith("CHAIN ")) { - container.setName(containerTaskName+" -> "+chainedTask.getTaskName()); - } else { - container.setName("CHAIN "+containerTaskName+" -> "+chainedTask.getTaskName()); - } - - this.chainedTasksInSequence.add(chainedTask); - return; - } - else if (node instanceof BulkPartialSolutionPlanNode || - node instanceof WorksetPlanNode) - { - // merged iteration head task. the task that the head is merged with will take care of it - return; - } else { - throw new CompilerException("Bug: Unrecognized merged task vertex."); - } - } - - // -------- Here, we translate non-chained tasks ------------- - - - if (this.currentIteration != null) { - AbstractJobVertex head = this.iterations.get(this.currentIteration).getHeadTask(); - // the head may still be null if we descend into the static parts first - if (head != null) { - targetVertex.setStrictlyCoLocatedWith(head); - } - } - - - // create the config that will contain all the description of the inputs - final TaskConfig targetVertexConfig = new TaskConfig(targetVertex.getConfiguration()); - - // get the inputs. if this node is the head of an iteration, we obtain the inputs from the - // enclosing iteration node, because the inputs are the initial inputs to the iteration. - final Iterator<Channel> inConns; - if (node instanceof BulkPartialSolutionPlanNode) { - inConns = ((BulkPartialSolutionPlanNode) node).getContainingIterationNode().getInputs().iterator(); - // because the partial solution has its own vertex, is has only one (logical) input. - // note this in the task configuration - targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0); - } else if (node instanceof WorksetPlanNode) { - WorksetPlanNode wspn = (WorksetPlanNode) node; - // input that is the initial workset - inConns = Collections.singleton(wspn.getContainingIterationNode().getInput2()).iterator(); - - // because we have a stand-alone (non-merged) workset iteration head, the initial workset will - // be input 0 and the solution set will be input 1 - targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0); - targetVertexConfig.setIterationHeadSolutionSetInputIndex(1); - } else { - inConns = node.getInputs().iterator(); - } - if (!inConns.hasNext()) { - throw new CompilerException("Bug: Found a non-source task with no input."); - } - - int inputIndex = 0; - while (inConns.hasNext()) { - Channel input = inConns.next(); - inputIndex += translateChannel(input, inputIndex, targetVertex, targetVertexConfig, false); - } - // broadcast variables - int broadcastInputIndex = 0; - for (NamedChannel broadcastInput: node.getBroadcastInputs()) { - int broadcastInputIndexDelta = translateChannel(broadcastInput, broadcastInputIndex, targetVertex, targetVertexConfig, true); - targetVertexConfig.setBroadcastInputName(broadcastInput.getName(), broadcastInputIndex); - targetVertexConfig.setBroadcastInputSerializer(broadcastInput.getSerializer(), broadcastInputIndex); - broadcastInputIndex += broadcastInputIndexDelta; - } - } catch (Exception e) { - throw new CompilerException( - "An error occurred while translating the optimized plan to a nephele JobGraph: " + e.getMessage(), e); - } - } - - private int translateChannel(Channel input, int inputIndex, AbstractJobVertex targetVertex, - TaskConfig targetVertexConfig, boolean isBroadcast) throws Exception - { - final PlanNode inputPlanNode = input.getSource(); - final Iterator<Channel> allInChannels; - - if (inputPlanNode instanceof NAryUnionPlanNode) { - allInChannels = ((NAryUnionPlanNode) inputPlanNode).getListOfInputs().iterator(); - } - else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) { - if (this.vertices.get(inputPlanNode) == null) { - // merged iteration head - final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) inputPlanNode; - final BulkIterationPlanNode iterationNode = pspn.getContainingIterationNode(); - - // check if the iteration's input is a union - if (iterationNode.getInput().getSource() instanceof NAryUnionPlanNode) { - allInChannels = ((NAryUnionPlanNode) iterationNode.getInput().getSource()).getInputs().iterator(); - } else { - allInChannels = Collections.singletonList(iterationNode.getInput()).iterator(); - } - - // also, set the index of the gate with the partial solution - targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(inputIndex); - } else { - // standalone iteration head - allInChannels = Collections.singletonList(input).iterator(); - } - } else if (inputPlanNode instanceof WorksetPlanNode) { - if (this.vertices.get(inputPlanNode) == null) { - // merged iteration head - final WorksetPlanNode wspn = (WorksetPlanNode) inputPlanNode; - final WorksetIterationPlanNode iterationNode = wspn.getContainingIterationNode(); - - // check if the iteration's input is a union - if (iterationNode.getInput2().getSource() instanceof NAryUnionPlanNode) { - allInChannels = ((NAryUnionPlanNode) iterationNode.getInput2().getSource()).getInputs().iterator(); - } else { - allInChannels = Collections.singletonList(iterationNode.getInput2()).iterator(); - } - - // also, set the index of the gate with the partial solution - targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(inputIndex); - } else { - // standalone iteration head - allInChannels = Collections.singletonList(input).iterator(); - } - } else if (inputPlanNode instanceof SolutionSetPlanNode) { - // for now, skip connections with the solution set node, as this is a local index access (later to be parameterized here) - // rather than a vertex connection - return 0; - } else { - allInChannels = Collections.singletonList(input).iterator(); - } - - // check that the type serializer is consistent - TypeSerializerFactory<?> typeSerFact = null; - - // accounting for channels on the dynamic path - int numChannelsTotal = 0; - int numChannelsDynamicPath = 0; - int numDynamicSenderTasksTotal = 0; - - - // expand the channel to all the union channels, in case there is a union operator at its source - while (allInChannels.hasNext()) { - final Channel inConn = allInChannels.next(); - - // sanity check the common serializer - if (typeSerFact == null) { - typeSerFact = inConn.getSerializer(); - } else if (!typeSerFact.equals(inConn.getSerializer())) { - throw new CompilerException("Conflicting types in union operator."); - } - - final PlanNode sourceNode = inConn.getSource(); - AbstractJobVertex sourceVertex = this.vertices.get(sourceNode); - TaskConfig sourceVertexConfig; - - if (sourceVertex == null) { - // this predecessor is chained to another task or an iteration - final TaskInChain chainedTask; - final IterationDescriptor iteration; - if ((chainedTask = this.chainedTasks.get(sourceNode)) != null) { - // push chained task - if (chainedTask.getContainingVertex() == null) { - throw new IllegalStateException("Bug: Chained task has not been assigned its containing vertex when connecting."); - } - sourceVertex = chainedTask.getContainingVertex(); - sourceVertexConfig = chainedTask.getTaskConfig(); - } else if ((iteration = this.iterations.get(sourceNode)) != null) { - // predecessor is an iteration - sourceVertex = iteration.getHeadTask(); - sourceVertexConfig = iteration.getHeadFinalResultConfig(); - } else { - throw new CompilerException("Bug: Could not resolve source node for a channel."); - } - } else { - // predecessor is its own vertex - sourceVertexConfig = new TaskConfig(sourceVertex.getConfiguration()); - } - DistributionPattern pattern = connectJobVertices( - inConn, inputIndex, sourceVertex, sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast); - - // accounting on channels and senders - numChannelsTotal++; - if (inConn.isOnDynamicPath()) { - numChannelsDynamicPath++; - numDynamicSenderTasksTotal += getNumberOfSendersPerReceiver(pattern, - sourceVertex.getParallelism(), targetVertex.getParallelism()); - } - } - - // for the iterations, check that the number of dynamic channels is the same as the number - // of channels for this logical input. this condition is violated at the moment, if there - // is a union between nodes on the static and nodes on the dynamic path - if (numChannelsDynamicPath > 0 && numChannelsTotal != numChannelsDynamicPath) { - throw new CompilerException("Error: It is currently not supported to union between dynamic and static path in an iteration."); - } - if (numDynamicSenderTasksTotal > 0) { - if (isBroadcast) { - targetVertexConfig.setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(inputIndex, numDynamicSenderTasksTotal); - } else { - targetVertexConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(inputIndex, numDynamicSenderTasksTotal); - } - } - - // the local strategy is added only once. in non-union case that is the actual edge, - // in the union case, it is the edge between union and the target node - addLocalInfoFromChannelToConfig(input, targetVertexConfig, inputIndex, isBroadcast); - return 1; - } - - private int getNumberOfSendersPerReceiver(DistributionPattern pattern, int numSenders, int numReceivers) { - if (pattern == DistributionPattern.ALL_TO_ALL) { - return numSenders; - } else if (pattern == DistributionPattern.POINTWISE) { - if (numSenders != numReceivers) { - if (numReceivers == 1) { - return numSenders; - } - else if (numSenders == 1) { - return 1; - } - else { - throw new CompilerException("Error: A changing degree of parallelism is currently " + - "not supported between tasks within an iteration."); - } - } else { - return 1; - } - } else { - throw new CompilerException("Unknown distribution pattern for channels: " + pattern); - } - } - - // ------------------------------------------------------------------------ - // Methods for creating individual vertices - // ------------------------------------------------------------------------ - - private AbstractJobVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException { - final String taskName = node.getNodeName(); - final DriverStrategy ds = node.getDriverStrategy(); - - // check, whether chaining is possible - boolean chaining = false; - { - Channel inConn = node.getInput(); - PlanNode pred = inConn.getSource(); - chaining = ds.getPushChainDriverClass() != null && - !(pred instanceof NAryUnionPlanNode) && // first op after union is stand-alone, because union is merged - !(pred instanceof BulkPartialSolutionPlanNode) && // partial solution merges anyways - !(pred instanceof WorksetPlanNode) && // workset merges anyways - !(pred instanceof IterationPlanNode) && // cannot chain with iteration heads currently - inConn.getShipStrategy() == ShipStrategyType.FORWARD && - inConn.getLocalStrategy() == LocalStrategy.NONE && - pred.getOutgoingChannels().size() == 1 && - node.getDegreeOfParallelism() == pred.getDegreeOfParallelism() && - node.getBroadcastInputs().isEmpty(); - - // cannot chain the nodes that produce the next workset or the next solution set, if they are not the - // in a tail - if (this.currentIteration != null && this.currentIteration instanceof WorksetIterationPlanNode && - node.getOutgoingChannels().size() > 0) - { - WorksetIterationPlanNode wspn = (WorksetIterationPlanNode) this.currentIteration; - if (wspn.getSolutionSetDeltaPlanNode() == pred || wspn.getNextWorkSetPlanNode() == pred) { - chaining = false; - } - } - // cannot chain the nodes that produce the next workset in a bulk iteration if a termination criterion follows - if (this.currentIteration != null && this.currentIteration instanceof BulkIterationPlanNode) - { - BulkIterationPlanNode wspn = (BulkIterationPlanNode) this.currentIteration; - if (node == wspn.getRootOfTerminationCriterion() && wspn.getRootOfStepFunction() == pred){ - chaining = false; - }else if(node.getOutgoingChannels().size() > 0 &&(wspn.getRootOfStepFunction() == pred || - wspn.getRootOfTerminationCriterion() == pred)) { - chaining = false; - } - } - } - - final AbstractJobVertex vertex; - final TaskConfig config; - - if (chaining) { - vertex = null; - config = new TaskConfig(new Configuration()); - this.chainedTasks.put(node, new TaskInChain(ds.getPushChainDriverClass(), config, taskName)); - } else { - // create task vertex - vertex = new AbstractJobVertex(taskName); - vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class); - - config = new TaskConfig(vertex.getConfiguration()); - config.setDriver(ds.getDriverClass()); - } - - // set user code - config.setStubWrapper(node.getPactContract().getUserCodeWrapper()); - config.setStubParameters(node.getPactContract().getParameters()); - - // set the driver strategy - config.setDriverStrategy(ds); - for(int i=0;i<ds.getNumRequiredComparators();i++) { - config.setDriverComparator(node.getComparator(i), i); - } - // assign memory, file-handles, etc. - assignDriverResources(node, config); - return vertex; - } - - private AbstractJobVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException { - final String taskName = node.getNodeName(); - final DriverStrategy ds = node.getDriverStrategy(); - final AbstractJobVertex vertex = new AbstractJobVertex(taskName); - final TaskConfig config = new TaskConfig(vertex.getConfiguration()); - vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class); - - // set user code - config.setStubWrapper(node.getPactContract().getUserCodeWrapper()); - config.setStubParameters(node.getPactContract().getParameters()); - - // set the driver strategy - config.setDriver(ds.getDriverClass()); - config.setDriverStrategy(ds); - if (node.getComparator1() != null) { - config.setDriverComparator(node.getComparator1(), 0); - } - if (node.getComparator2() != null) { - config.setDriverComparator(node.getComparator2(), 1); - } - if (node.getPairComparator() != null) { - config.setDriverPairComparator(node.getPairComparator()); - } - - // assign memory, file-handles, etc. - assignDriverResources(node, config); - return vertex; - } - - private InputFormatVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException { - final InputFormatVertex vertex = new InputFormatVertex(node.getNodeName()); - final TaskConfig config = new TaskConfig(vertex.getConfiguration()); - - vertex.setInvokableClass(DataSourceTask.class); - vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper())); - - // set user code - config.setStubWrapper(node.getPactContract().getUserCodeWrapper()); - config.setStubParameters(node.getPactContract().getParameters()); - - config.setOutputSerializer(node.getSerializer()); - return vertex; - } - - private AbstractJobVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException { - final OutputFormatVertex vertex = new OutputFormatVertex(node.getNodeName()); - final TaskConfig config = new TaskConfig(vertex.getConfiguration()); - - vertex.setInvokableClass(DataSinkTask.class); - vertex.setFormatDescription(getDescriptionForUserCode(node.getPactContract().getUserCodeWrapper())); - - // set user code - config.setStubWrapper(node.getPactContract().getUserCodeWrapper()); - config.setStubParameters(node.getPactContract().getParameters()); - - return vertex; - } - - private AbstractJobVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn) { - // get the bulk iteration that corresponds to this partial solution node - final BulkIterationPlanNode iteration = pspn.getContainingIterationNode(); - - // check whether we need an individual vertex for the partial solution, or whether we - // attach ourselves to the vertex of the parent node. We can combine the head with a node of - // the step function, if - // 1) There is one parent that the partial solution connects to via a forward pattern and no - // local strategy - // 2) DOP and the number of subtasks per instance does not change - // 3) That successor is not a union - // 4) That successor is not itself the last node of the step function - // 5) There is no local strategy on the edge for the initial partial solution, as - // this translates to a local strategy that would only be executed in the first iteration - - final boolean merge; - if (mergeIterationAuxTasks && pspn.getOutgoingChannels().size() == 1) { - final Channel c = pspn.getOutgoingChannels().get(0); - final PlanNode successor = c.getTarget(); - merge = c.getShipStrategy() == ShipStrategyType.FORWARD && - c.getLocalStrategy() == LocalStrategy.NONE && - c.getTempMode() == TempMode.NONE && - successor.getDegreeOfParallelism() == pspn.getDegreeOfParallelism() && - !(successor instanceof NAryUnionPlanNode) && - successor != iteration.getRootOfStepFunction() && - iteration.getInput().getLocalStrategy() == LocalStrategy.NONE; - } else { - merge = false; - } - - // create or adopt the head vertex - final AbstractJobVertex toReturn; - final AbstractJobVertex headVertex; - final TaskConfig headConfig; - if (merge) { - final PlanNode successor = pspn.getOutgoingChannels().get(0).getTarget(); - headVertex = (AbstractJobVertex) this.vertices.get(successor); - - if (headVertex == null) { - throw new CompilerException( - "Bug: Trying to merge solution set with its sucessor, but successor has not been created."); - } - - // reset the vertex type to iteration head - headVertex.setInvokableClass(IterationHeadPactTask.class); - headConfig = new TaskConfig(headVertex.getConfiguration()); - toReturn = null; - } else { - // instantiate the head vertex and give it a no-op driver as the driver strategy. - // everything else happens in the post visit, after the input (the initial partial solution) - // is connected. - headVertex = new AbstractJobVertex("PartialSolution ("+iteration.getNodeName()+")"); - headVertex.setInvokableClass(IterationHeadPactTask.class); - headConfig = new TaskConfig(headVertex.getConfiguration()); - headConfig.setDriver(NoOpDriver.class); - toReturn = headVertex; - } - - // create the iteration descriptor and the iteration to it - IterationDescriptor descr = this.iterations.get(iteration); - if (descr == null) { - throw new CompilerException("Bug: Iteration descriptor was not created at when translating the iteration node."); - } - descr.setHeadTask(headVertex, headConfig); - - return toReturn; - } - - private AbstractJobVertex createWorksetIterationHead(WorksetPlanNode wspn) { - // get the bulk iteration that corresponds to this partial solution node - final WorksetIterationPlanNode iteration = wspn.getContainingIterationNode(); - - // check whether we need an individual vertex for the partial solution, or whether we - // attach ourselves to the vertex of the parent node. We can combine the head with a node of - // the step function, if - // 1) There is one parent that the partial solution connects to via a forward pattern and no - // local strategy - // 2) DOP and the number of subtasks per instance does not change - // 3) That successor is not a union - // 4) That successor is not itself the last node of the step function - // 5) There is no local strategy on the edge for the initial workset, as - // this translates to a local strategy that would only be executed in the first superstep - - final boolean merge; - if (mergeIterationAuxTasks && wspn.getOutgoingChannels().size() == 1) { - final Channel c = wspn.getOutgoingChannels().get(0); - final PlanNode successor = c.getTarget(); - merge = c.getShipStrategy() == ShipStrategyType.FORWARD && - c.getLocalStrategy() == LocalStrategy.NONE && - c.getTempMode() == TempMode.NONE && - successor.getDegreeOfParallelism() == wspn.getDegreeOfParallelism() && - !(successor instanceof NAryUnionPlanNode) && - successor != iteration.getNextWorkSetPlanNode() && - iteration.getInitialWorksetInput().getLocalStrategy() == LocalStrategy.NONE; - } else { - merge = false; - } - - // create or adopt the head vertex - final AbstractJobVertex toReturn; - final AbstractJobVertex headVertex; - final TaskConfig headConfig; - if (merge) { - final PlanNode successor = wspn.getOutgoingChannels().get(0).getTarget(); - headVertex = (AbstractJobVertex) this.vertices.get(successor); - - if (headVertex == null) { - throw new CompilerException( - "Bug: Trying to merge solution set with its sucessor, but successor has not been created."); - } - - // reset the vertex type to iteration head - headVertex.setInvokableClass(IterationHeadPactTask.class); - headConfig = new TaskConfig(headVertex.getConfiguration()); - toReturn = null; - } else { - // instantiate the head vertex and give it a no-op driver as the driver strategy. - // everything else happens in the post visit, after the input (the initial partial solution) - // is connected. - headVertex = new AbstractJobVertex("IterationHead("+iteration.getNodeName()+")"); - headVertex.setInvokableClass(IterationHeadPactTask.class); - headConfig = new TaskConfig(headVertex.getConfiguration()); - headConfig.setDriver(NoOpDriver.class); - toReturn = headVertex; - } - - headConfig.setSolutionSetUnmanaged(iteration.getIterationNode().getIterationContract().isSolutionSetUnManaged()); - - // create the iteration descriptor and the iteration to it - IterationDescriptor descr = this.iterations.get(iteration); - if (descr == null) { - throw new CompilerException("Bug: Iteration descriptor was not created at when translating the iteration node."); - } - descr.setHeadTask(headVertex, headConfig); - - return toReturn; - } - - private void assignDriverResources(PlanNode node, TaskConfig config) { - final double relativeMem = node.getRelativeMemoryPerSubTask(); - if (relativeMem > 0) { - config.setRelativeMemoryDriver(relativeMem); - config.setFilehandlesDriver(this.defaultMaxFan); - config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold); - } - } - - private void assignLocalStrategyResources(Channel c, TaskConfig config, int inputNum) { - if (c.getRelativeMemoryLocalStrategy() > 0) { - config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy()); - config.setFilehandlesInput(inputNum, this.defaultMaxFan); - config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold); - } - } - - // ------------------------------------------------------------------------ - // Connecting Vertices - // ------------------------------------------------------------------------ - - /** - * NOTE: The channel for global and local strategies are different if we connect a union. The global strategy - * channel is then the channel into the union node, the local strategy channel the one from the union to the - * actual target operator. - * - * @param channel - * @param inputNumber - * @param sourceVertex - * @param sourceConfig - * @param targetVertex - * @param targetConfig - * @param isBroadcast - * @throws CompilerException - */ - private DistributionPattern connectJobVertices(Channel channel, int inputNumber, - final AbstractJobVertex sourceVertex, final TaskConfig sourceConfig, - final AbstractJobVertex targetVertex, final TaskConfig targetConfig, boolean isBroadcast) - throws CompilerException - { - // ------------ connect the vertices to the job graph -------------- - final DistributionPattern distributionPattern; - - switch (channel.getShipStrategy()) { - case FORWARD: - distributionPattern = DistributionPattern.POINTWISE; - break; - case PARTITION_RANDOM: - case BROADCAST: - case PARTITION_HASH: - case PARTITION_CUSTOM: - case PARTITION_RANGE: - case PARTITION_FORCED_REBALANCE: - distributionPattern = DistributionPattern.ALL_TO_ALL; - break; - default: - throw new RuntimeException("Unknown runtime ship strategy: " + channel.getShipStrategy()); - } - - final ResultPartitionType resultType; - - switch (channel.getDataExchangeMode()) { - - case PIPELINED: - resultType = ResultPartitionType.PIPELINED; - break; - - case BATCH: - // BLOCKING results are currently not supported in closed loop iterations - // - // See https://issues.apache.org/jira/browse/FLINK-1713 for details - resultType = channel.getSource().isOnDynamicPath() - ? ResultPartitionType.PIPELINED - : ResultPartitionType.BLOCKING; - break; - - case PIPELINE_WITH_BATCH_FALLBACK: - throw new UnsupportedOperationException("Data exchange mode " + - channel.getDataExchangeMode() + " currently not supported."); - - default: - throw new UnsupportedOperationException("Unknown data exchange mode."); - - } - - targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern, resultType); - - // -------------- configure the source task's ship strategy strategies in task config -------------- - final int outputIndex = sourceConfig.getNumOutputs(); - sourceConfig.addOutputShipStrategy(channel.getShipStrategy()); - if (outputIndex == 0) { - sourceConfig.setOutputSerializer(channel.getSerializer()); - } - if (channel.getShipStrategyComparator() != null) { - sourceConfig.setOutputComparator(channel.getShipStrategyComparator(), outputIndex); - } - - if (channel.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) { - - final DataDistribution dataDistribution = channel.getDataDistribution(); - if (dataDistribution != null) { - sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex); - } else { - throw new RuntimeException("Range partitioning requires data distribution"); - // TODO: inject code and configuration for automatic histogram generation - } - } - - if (channel.getShipStrategy() == ShipStrategyType.PARTITION_CUSTOM) { - if (channel.getPartitioner() != null) { - sourceConfig.setOutputPartitioner(channel.getPartitioner(), outputIndex); - } else { - throw new CompilerException("The ship strategy was set to custom partitioning, but no partitioner was set."); - } - } - - // ---------------- configure the receiver ------------------- - if (isBroadcast) { - targetConfig.addBroadcastInputToGroup(inputNumber); - } else { - targetConfig.addInputToGroup(inputNumber); - } - return distributionPattern; - } - - private void addLocalInfoFromChannelToConfig(Channel channel, TaskConfig config, int inputNum, boolean isBroadcastChannel) { - // serializer - if (isBroadcastChannel) { - config.setBroadcastInputSerializer(channel.getSerializer(), inputNum); - - if (channel.getLocalStrategy() != LocalStrategy.NONE || (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE)) { - throw new CompilerException("Found local strategy or temp mode on a broadcast variable channel."); - } else { - return; - } - } else { - config.setInputSerializer(channel.getSerializer(), inputNum); - } - - // local strategy - if (channel.getLocalStrategy() != LocalStrategy.NONE) { - config.setInputLocalStrategy(inputNum, channel.getLocalStrategy()); - if (channel.getLocalStrategyComparator() != null) { - config.setInputComparator(channel.getLocalStrategyComparator(), inputNum); - } - } - - assignLocalStrategyResources(channel, config, inputNum); - - // materialization / caching - if (channel.getTempMode() != null) { - final TempMode tm = channel.getTempMode(); - - boolean needsMemory = false; - // Don't add a pipeline breaker if the data exchange is already blocking. - if (tm.breaksPipeline() && channel.getDataExchangeMode() != DataExchangeMode.BATCH) { - config.setInputAsynchronouslyMaterialized(inputNum, true); - needsMemory = true; - } - if (tm.isCached()) { - config.setInputCached(inputNum, true); - needsMemory = true; - } - - if (needsMemory) { - // sanity check - if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) { - throw new CompilerException("Bug in compiler: Inconsistent description of input materialization."); - } - config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory()); - } - } - } - - private void finalizeBulkIteration(IterationDescriptor descr) { - - final BulkIterationPlanNode bulkNode = (BulkIterationPlanNode) descr.getIterationNode(); - final AbstractJobVertex headVertex = descr.getHeadTask(); - final TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration()); - final TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig(); - - // ------------ finalize the head config with the final outputs and the sync gate ------------ - final int numStepFunctionOuts = headConfig.getNumOutputs(); - final int numFinalOuts = headFinalOutputConfig.getNumOutputs(); - - if (numStepFunctionOuts == 0) { - throw new CompilerException("The iteration has no operation inside the step function."); - } - - headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig); - headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts); - final double relativeMemForBackChannel = bulkNode.getRelativeMemoryPerSubTask(); - if (relativeMemForBackChannel <= 0) { - throw new CompilerException("Bug: No memory has been assigned to the iteration back channel."); - } - headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel); - - // --------------------------- create the sync task --------------------------- - final AbstractJobVertex sync = new AbstractJobVertex("Sync(" + bulkNode.getNodeName() + ")"); - sync.setInvokableClass(IterationSynchronizationSinkTask.class); - sync.setParallelism(1); - this.auxVertices.add(sync); - - final TaskConfig syncConfig = new TaskConfig(sync.getConfiguration()); - syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, headVertex.getParallelism()); - - // set the number of iteration / convergence criterion for the sync - final int maxNumIterations = bulkNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations(); - if (maxNumIterations < 1) { - throw new CompilerException("Cannot create bulk iteration with unspecified maximum number of iterations."); - } - syncConfig.setNumberOfIterations(maxNumIterations); - - // connect the sync task - sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE); - - // ----------------------------- create the iteration tail ------------------------------ - - final PlanNode rootOfTerminationCriterion = bulkNode.getRootOfTerminationCriterion(); - final PlanNode rootOfStepFunction = bulkNode.getRootOfStepFunction(); - final TaskConfig tailConfig; - - AbstractJobVertex rootOfStepFunctionVertex = (AbstractJobVertex) this.vertices.get(rootOfStepFunction); - if (rootOfStepFunctionVertex == null) { - // last op is chained - final TaskInChain taskInChain = this.chainedTasks.get(rootOfStepFunction); - if (taskInChain == null) { - throw new CompilerException("Bug: Tail of step function not found as vertex or chained task."); - } - rootOfStepFunctionVertex = (AbstractJobVertex) taskInChain.getContainingVertex(); - - // the fake channel is statically typed to pact record. no data is sent over this channel anyways. - tailConfig = taskInChain.getTaskConfig(); - } else { - tailConfig = new TaskConfig(rootOfStepFunctionVertex.getConfiguration()); - } - - tailConfig.setIsWorksetUpdate(); - - // No following termination criterion - if (rootOfStepFunction.getOutgoingChannels().isEmpty()) { - - rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class); - - tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel()); - } - - - // create the fake output task for termination criterion, if needed - final TaskConfig tailConfigOfTerminationCriterion; - // If we have a termination criterion and it is not an intermediate node - if(rootOfTerminationCriterion != null && rootOfTerminationCriterion.getOutgoingChannels().isEmpty()) { - AbstractJobVertex rootOfTerminationCriterionVertex = (AbstractJobVertex) this.vertices.get(rootOfTerminationCriterion); - - - if (rootOfTerminationCriterionVertex == null) { - // last op is chained - final TaskInChain taskInChain = this.chainedTasks.get(rootOfTerminationCriterion); - if (taskInChain == null) { - throw new CompilerException("Bug: Tail of termination criterion not found as vertex or chained task."); - } - rootOfTerminationCriterionVertex = (AbstractJobVertex) taskInChain.getContainingVertex(); - - // the fake channel is statically typed to pact record. no data is sent over this channel anyways. - tailConfigOfTerminationCriterion = taskInChain.getTaskConfig(); - } else { - tailConfigOfTerminationCriterion = new TaskConfig(rootOfTerminationCriterionVertex.getConfiguration()); - } - - rootOfTerminationCriterionVertex.setInvokableClass(IterationTailPactTask.class); - // Hack - tailConfigOfTerminationCriterion.setIsSolutionSetUpdate(); - tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel()); - - // tell the head that it needs to wait for the solution set updates - headConfig.setWaitForSolutionSetUpdate(); - } - - // ------------------- register the aggregators ------------------- - AggregatorRegistry aggs = bulkNode.getIterationNode().getIterationContract().getAggregators(); - Collection<AggregatorWithName<?>> allAggregators = aggs.getAllRegisteredAggregators(); - - headConfig.addIterationAggregators(allAggregators); - syncConfig.addIterationAggregators(allAggregators); - - String convAggName = aggs.getConvergenceCriterionAggregatorName(); - ConvergenceCriterion<?> convCriterion = aggs.getConvergenceCriterion(); - - if (convCriterion != null || convAggName != null) { - if (convCriterion == null) { - throw new CompilerException("Error: Convergence criterion aggregator set, but criterion is null."); - } - if (convAggName == null) { - throw new CompilerException("Error: Aggregator convergence criterion set, but aggregator is null."); - } - - syncConfig.setConvergenceCriterion(convAggName, convCriterion); - } - } - - private void finalizeWorksetIteration(IterationDescriptor descr) { - final WorksetIterationPlanNode iterNode = (WorksetIterationPlanNode) descr.getIterationNode(); - final AbstractJobVertex headVertex = descr.getHeadTask(); - final TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration()); - final TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig(); - - // ------------ finalize the head config with the final outputs and the sync gate ------------ - { - final int numStepFunctionOuts = headConfig.getNumOutputs(); - final int numFinalOuts = headFinalOutputConfig.getNumOutputs(); - - if (numStepFunctionOuts == 0) { - throw new CompilerException("The workset iteration has no operation on the workset inside the step function."); - } - - headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig); - headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts); - final double relativeMemory = iterNode.getRelativeMemoryPerSubTask(); - if (relativeMemory <= 0) { - throw new CompilerException("Bug: No memory has been assigned to the workset iteration."); - } - - headConfig.setIsWorksetIteration(); - headConfig.setRelativeBackChannelMemory(relativeMemory / 2); - headConfig.setRelativeSolutionSetMemory(relativeMemory / 2); - - // set the solution set serializer and comparator - headConfig.setSolutionSetSerializer(iterNode.getSolutionSetSerializer()); - headConfig.setSolutionSetComparator(iterNode.getSolutionSetComparator()); - } - - // --------------------------- create the sync task --------------------------- - final TaskConfig syncConfig; - { - final AbstractJobVertex sync = new AbstractJobVertex("Sync (" + iterNode.getNodeName() + ")"); - sync.setInvokableClass(IterationSynchronizationSinkTask.class); - sync.setParallelism(1); - this.auxVertices.add(sync); - - syncConfig = new TaskConfig(sync.getConfiguration()); - syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, headVertex.getParallelism()); - - // set the number of iteration / convergence criterion for the sync - final int maxNumIterations = iterNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations(); - if (maxNumIterations < 1) { - throw new CompilerException("Cannot create workset iteration with unspecified maximum number of iterations."); - } - syncConfig.setNumberOfIterations(maxNumIterations); - - // connect the sync task - sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE); - } - - // ----------------------------- create the iteration tails ----------------------------- - // ----------------------- for next workset and solution set delta----------------------- - - { - // we have three possible cases: - // 1) Two tails, one for workset update, one for solution set update - // 2) One tail for workset update, solution set update happens in an intermediate task - // 3) One tail for solution set update, workset update happens in an intermediate task - - final PlanNode nextWorksetNode = iterNode.getNextWorkSetPlanNode(); - final PlanNode solutionDeltaNode = iterNode.getSolutionSetDeltaPlanNode(); - - final boolean hasWorksetTail = nextWorksetNode.getOutgoingChannels().isEmpty(); - final boolean hasSolutionSetTail = (!iterNode.isImmediateSolutionSetUpdate()) || (!hasWorksetTail); - - { - // get the vertex for the workset update - final TaskConfig worksetTailConfig; - AbstractJobVertex nextWorksetVertex = (AbstractJobVertex) this.vertices.get(nextWorksetNode); - if (nextWorksetVertex == null) { - // nextWorksetVertex is chained - TaskInChain taskInChain = this.chainedTasks.get(nextWorksetNode); - if (taskInChain == null) { - throw new CompilerException("Bug: Next workset node not found as vertex or chained task."); - } - nextWorksetVertex = (AbstractJobVertex) taskInChain.getContainingVertex(); - worksetTailConfig = taskInChain.getTaskConfig(); - } else { - worksetTailConfig = new TaskConfig(nextWorksetVertex.getConfiguration()); - } - - // mark the node to perform workset updates - worksetTailConfig.setIsWorksetIteration(); - worksetTailConfig.setIsWorksetUpdate(); - - if (hasWorksetTail) { - nextWorksetVertex.setInvokableClass(IterationTailPactTask.class); - - worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer()); - } - } - { - final TaskConfig solutionDeltaConfig; - AbstractJobVertex solutionDeltaVertex = (AbstractJobVertex) this.vertices.get(solutionDeltaNode); - if (solutionDeltaVertex == null) { - // last op is chained - TaskInChain taskInChain = this.chainedTasks.get(solutionDeltaNode); - if (taskInChain == null) { - throw new CompilerException("Bug: Solution Set Delta not found as vertex or chained task."); - } - solutionDeltaVertex = (AbstractJobVertex) taskInChain.getContainingVertex(); - solutionDeltaConfig = taskInChain.getTaskConfig(); - } else { - solutionDeltaConfig = new TaskConfig(solutionDeltaVertex.getConfiguration()); - } - - solutionDeltaConfig.setIsWorksetIteration(); - solutionDeltaConfig.setIsSolutionSetUpdate(); - - if (hasSolutionSetTail) { - solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class); - - solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer()); - - // tell the head that it needs to wait for the solution set updates - headConfig.setWaitForSolutionSetUpdate(); - } - else { - // no tail, intermediate update. must be immediate update - if (!iterNode.isImmediateSolutionSetUpdate()) { - throw new CompilerException("A solution set update without dedicated tail is not set to perform immediate updates."); - } - solutionDeltaConfig.setIsSolutionSetUpdateWithoutReprobe(); - } - } - } - - // ------------------- register the aggregators ------------------- - AggregatorRegistry aggs = iterNode.getIterationNode().getIterationContract().getAggregators(); - Collection<AggregatorWithName<?>> allAggregators = aggs.getAllRegisteredAggregators(); - - for (AggregatorWithName<?> agg : allAggregators) { - if (agg.getName().equals(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME)) { - throw new CompilerException("User defined aggregator used the same name as built-in workset " + - "termination check aggregator: " + WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME); - } - } - - headConfig.addIterationAggregators(allAggregators); - syncConfig.addIterationAggregators(allAggregators); - - String convAggName = aggs.getConvergenceCriterionAggregatorName(); - ConvergenceCriterion<?> convCriterion = aggs.getConvergenceCriterion(); - - if (convCriterion != null || convAggName != null) { - throw new CompilerException("Error: Cannot use custom convergence criterion with workset iteration. Workset iterations have implicit convergence criterion where workset is empty."); - } - - headConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator()); - syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator()); - syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new WorksetEmptyConvergenceCriterion()); - } - - private static String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) { - try { - if (wrapper.hasObject()) { - try { - return wrapper.getUserCodeObject().toString(); - } - catch (Throwable t) { - return wrapper.getUserCodeClass().getName(); - } - } - else { - return wrapper.getUserCodeClass().getName(); - } - } - catch (Throwable t) { - return null; - } - } - - // ------------------------------------------------------------------------------------- - // Descriptors for tasks / configurations that are chained or merged with other tasks - // ------------------------------------------------------------------------------------- - - /** - * Utility class that describes a task in a sequence of chained tasks. Chained tasks are tasks that run - * together in one thread. - */ - private static final class TaskInChain { - - private final Class<? extends ChainedDriver<?, ?>> chainedTask; - - private final TaskConfig taskConfig; - - private final String taskName; - - private AbstractJobVertex containingVertex; - - TaskInChain(Class<? extends ChainedDriver<?, ?>> chainedTask, TaskConfig taskConfig, - String taskName) { - this.chainedTask = chainedTask; - this.taskConfig = taskConfig; - this.taskName = taskName; - } - - public Class<? extends ChainedDriver<?, ?>> getChainedTask() { - return this.chainedTask; - } - - public TaskConfig getTaskConfig() { - return this.taskConfig; - } - - public String getTaskName() { - return this.taskName; - } - - public AbstractJobVertex getContainingVertex() { - return this.containingVertex; - } - - public void setContainingVertex(AbstractJobVertex containingVertex) { - this.containingVertex = containingVertex; - } - } - - private static final class IterationDescriptor { - - private final IterationPlanNode iterationNode; - - private AbstractJobVertex headTask; - - private TaskConfig headConfig; - - private TaskConfig headFinalResultConfig; - - private final int id; - - public IterationDescriptor(IterationPlanNode iterationNode, int id) { - this.iterationNode = iterationNode; - this.id = id; - } - - public IterationPlanNode getIterationNode() { - return iterationNode; - } - - public void setHeadTask(AbstractJobVertex headTask, TaskConfig headConfig) { - this.headTask = headTask; - this.headFinalResultConfig = new TaskConfig(new Configuration()); - - // check if we already had a configuration, for example if the solution set was - if (this.headConfig != null) { - headConfig.getConfiguration().addAll(this.headConfig.getConfiguration()); - } - - this.headConfig = headConfig; - } - - public AbstractJobVertex getHeadTask() { - return headTask; - } - - public TaskConfig getHeadFinalResultConfig() { - return headFinalResultConfig; - } - - public int getId() { - return this.id; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java index 3fd7457..2d8377e 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java @@ -89,7 +89,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im } catch (ConflictingFieldTypeInfoException e) { throw new CompilerPostPassException("Conflicting type infomation for the data sink '" + - sn.getSinkNode().getPactContract().getName() + "'."); + sn.getSinkNode().getOperator().getName() + "'."); } // descend to the input channel @@ -98,7 +98,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im } catch (MissingFieldTypeInfoException ex) { throw new CompilerPostPassException("Missing type infomation for the channel that inputs to the data sink '" + - sn.getSinkNode().getPactContract().getName() + "'."); + sn.getSinkNode().getOperator().getName() + "'."); } } else if (node instanceof SourcePlanNode) { @@ -122,7 +122,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im // add the parent schema to the schema if (propagateParentSchemaDown) { - addSchemaToSchema(parentSchema, schema, iterationNode.getPactContract().getName()); + addSchemaToSchema(parentSchema, schema, iterationNode.getProgramOperator().getName()); } // check whether all outgoing channels have not yet contributed. come back later if not. @@ -168,7 +168,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im // take the schema from the partial solution node and add its fields to the iteration result schema. // input and output schema need to be identical, so this is essentially a sanity check - addSchemaToSchema(pss, schema, iterationNode.getPactContract().getName()); + addSchemaToSchema(pss, schema, iterationNode.getProgramOperator().getName()); // set the serializer if (createUtilities) { @@ -180,7 +180,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im propagateToChannel(schema, iterationNode.getInput(), createUtilities); } catch (MissingFieldTypeInfoException e) { throw new CompilerPostPassException("Could not set up runtime strategy for input channel to node '" - + iterationNode.getPactContract().getName() + "'. Missing type information for key field " + + + iterationNode.getProgramOperator().getName() + "'. Missing type information for key field " + e.getFieldNumber()); } } @@ -199,7 +199,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im // add the parent schema to the schema (which refers to the solution set schema) if (propagateParentSchemaDown) { - addSchemaToSchema(parentSchema, schema, iterationNode.getPactContract().getName()); + addSchemaToSchema(parentSchema, schema, iterationNode.getProgramOperator().getName()); } // check whether all outgoing channels have not yet contributed. come back later if not. @@ -242,7 +242,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im } } catch (ConflictingFieldTypeInfoException e) { throw new CompilerPostPassException("Conflicting type information for field " + e.getFieldNumber() - + " in node '" + iterationNode.getPactContract().getName() + "'. Contradicting types between the " + + + " in node '" + iterationNode.getProgramOperator().getName() + "'. Contradicting types between the " + "result of the iteration and the solution set schema: " + e.getPreviousType() + " and " + e.getNewType() + ". Most probable cause: Invalid constant field annotations."); } @@ -256,7 +256,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im iterationNode.setSolutionSetComparator(createComparator(optNode.getSolutionSetKeyFields(), null, sss)); } catch (MissingFieldTypeInfoException ex) { throw new CompilerPostPassException("Could not set up the solution set for workset iteration '" + - optNode.getPactContract().getName() + "'. Missing type information for key field " + ex.getFieldNumber() + '.'); + optNode.getOperator().getName() + "'. Missing type information for key field " + ex.getFieldNumber() + '.'); } } @@ -266,7 +266,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im propagateToChannel(wss, iterationNode.getInitialWorksetInput(), createUtilities); } catch (MissingFieldTypeInfoException ex) { throw new CompilerPostPassException("Could not set up runtime strategy for input channel to node '" - + iterationNode.getPactContract().getName() + "'. Missing type information for key field " + + + iterationNode.getProgramOperator().getName() + "'. Missing type information for key field " + ex.getFieldNumber()); } } @@ -298,7 +298,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im try { getSingleInputNodeSchema(sn, schema); } catch (ConflictingFieldTypeInfoException e) { - throw new CompilerPostPassException(getConflictingTypeErrorMessage(e, optNode.getPactContract().getName())); + throw new CompilerPostPassException(getConflictingTypeErrorMessage(e, optNode.getOperator().getName())); } if (createUtilities) { @@ -308,7 +308,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im sn.setComparator(createComparator(sn.getKeys(i), sn.getSortOrders(i), schema),i); } catch (MissingFieldTypeInfoException e) { throw new CompilerPostPassException("Could not set up runtime strategy for node '" + - optNode.getPactContract().getName() + "'. Missing type information for key field " + + optNode.getOperator().getName() + "'. Missing type information for key field " + e.getFieldNumber()); } } @@ -319,7 +319,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im propagateToChannel(schema, sn.getInput(), createUtilities); } catch (MissingFieldTypeInfoException e) { throw new CompilerPostPassException("Could not set up runtime strategy for input channel to node '" + - optNode.getPactContract().getName() + "'. Missing type information for field " + e.getFieldNumber()); + optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber()); } // don't forget the broadcast inputs @@ -328,7 +328,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im propagateToChannel(createEmptySchema(), c, createUtilities); } catch (MissingFieldTypeInfoException e) { throw new CompilerPostPassException("Could not set up runtime strategy for broadcast channel in node '" + - optNode.getPactContract().getName() + "'. Missing type information for field " + e.getFieldNumber()); + optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber()); } } } @@ -367,7 +367,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im try { getDualInputNodeSchema(dn, schema1, schema2); } catch (ConflictingFieldTypeInfoException e) { - throw new CompilerPostPassException(getConflictingTypeErrorMessage(e, optNode.getPactContract().getName())); + throw new CompilerPostPassException(getConflictingTypeErrorMessage(e, optNode.getOperator().getName())); } // parameterize the node's driver strategy @@ -379,7 +379,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im dn.setComparator2(createComparator(dn.getKeysForInput2(), dn.getSortOrders(), schema2)); } catch (MissingFieldTypeInfoException e) { throw new CompilerPostPassException("Could not set up runtime strategy for node '" + - optNode.getPactContract().getName() + "'. Missing type information for field " + e.getFieldNumber()); + optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber()); } // set the pair comparator @@ -388,7 +388,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im dn.getSortOrders(), schema1, schema2)); } catch (MissingFieldTypeInfoException e) { throw new CompilerPostPassException("Could not set up runtime strategy for node '" + - optNode.getPactContract().getName() + "'. Missing type information for field " + e.getFieldNumber()); + optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber()); } } @@ -399,13 +399,13 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im propagateToChannel(schema1, dn.getInput1(), createUtilities); } catch (MissingFieldTypeInfoException e) { throw new CompilerPostPassException("Could not set up runtime strategy for the first input channel to node '" - + optNode.getPactContract().getName() + "'. Missing type information for field " + e.getFieldNumber()); + + optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber()); } try { propagateToChannel(schema2, dn.getInput2(), createUtilities); } catch (MissingFieldTypeInfoException e) { throw new CompilerPostPassException("Could not set up runtime strategy for the second input channel to node '" - + optNode.getPactContract().getName() + "'. Missing type information for field " + e.getFieldNumber()); + + optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber()); } // don't forget the broadcast inputs @@ -414,7 +414,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im propagateToChannel(createEmptySchema(), c, createUtilities); } catch (MissingFieldTypeInfoException e) { throw new CompilerPostPassException("Could not set up runtime strategy for broadcast channel in node '" + - optNode.getPactContract().getName() + "'. Missing type information for field " + e.getFieldNumber()); + optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber()); } } } @@ -446,7 +446,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im schema = (T) psn.postPassHelper; } name = "partial solution of bulk iteration '" + - psn.getPartialSolutionNode().getIterationNode().getPactContract().getName() + "'"; + psn.getPartialSolutionNode().getIterationNode().getOperator().getName() + "'"; } else if (node instanceof SolutionSetPlanNode) { SolutionSetPlanNode ssn = (SolutionSetPlanNode) node; @@ -457,7 +457,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im schema = (T) ssn.postPassHelper; } name = "solution set of workset iteration '" + - ssn.getSolutionSetNode().getIterationNode().getPactContract().getName() + "'"; + ssn.getSolutionSetNode().getIterationNode().getOperator().getName() + "'"; } else if (node instanceof WorksetPlanNode) { WorksetPlanNode wsn = (WorksetPlanNode) node; @@ -468,7 +468,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im schema = (T) wsn.postPassHelper; } name = "workset of workset iteration '" + - wsn.getWorksetNode().getIterationNode().getPactContract().getName() + "'"; + wsn.getWorksetNode().getIterationNode().getOperator().getName() + "'"; } else { throw new CompilerException(); } @@ -531,7 +531,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im } } catch (ConflictingFieldTypeInfoException e) { throw new CompilerPostPassException("Conflicting type information for field " + e.getFieldNumber() - + " in node '" + optNode.getPactContract().getName() + "' propagated from successor node. " + + + " in node '" + optNode.getOperator().getName() + "' propagated from successor node. " + "Conflicting types: " + e.getPreviousType() + " and " + e.getNewType() + ". Most probable cause: Invalid constant field annotations."); } @@ -550,7 +550,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im return createSerializer(schema); } catch (MissingFieldTypeInfoException e) { throw new CompilerPostPassException("Missing type information while creating serializer for '" + - node.getPactContract().getName() + "'."); + node.getProgramOperator().getName() + "'."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java index 55d3269..5fdf3dd 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java @@ -114,7 +114,7 @@ public class JavaApiPostPass implements OptimizerPostPass { traverseChannel(addMapper.getInput()); } - BulkIterationBase<?> operator = (BulkIterationBase<?>) iterationNode.getPactContract(); + BulkIterationBase<?> operator = (BulkIterationBase<?>) iterationNode.getProgramOperator(); // set the serializer iterationNode.setSerializerForIterationChannel(createSerializer(operator.getOperatorInfo().getOutputType())); @@ -133,7 +133,7 @@ public class JavaApiPostPass implements OptimizerPostPass { throw new CompilerException("Optimizer cannot compile a workset iteration step function where the solution set delta is produced by a Union node."); } - DeltaIterationBase<?, ?> operator = (DeltaIterationBase<?, ?>) iterationNode.getPactContract(); + DeltaIterationBase<?, ?> operator = (DeltaIterationBase<?, ?>) iterationNode.getProgramOperator(); // set the serializers and comparators for the workset iteration iterationNode.setSolutionSetSerializer(createSerializer(operator.getOperatorInfo().getFirstInputType())); @@ -152,10 +152,10 @@ public class JavaApiPostPass implements OptimizerPostPass { else if (node instanceof SingleInputPlanNode) { SingleInputPlanNode sn = (SingleInputPlanNode) node; - if (!(sn.getOptimizerNode().getPactContract() instanceof SingleInputOperator)) { + if (!(sn.getOptimizerNode().getOperator() instanceof SingleInputOperator)) { // Special case for delta iterations - if(sn.getOptimizerNode().getPactContract() instanceof NoOpUnaryUdfOp) { + if(sn.getOptimizerNode().getOperator() instanceof NoOpUnaryUdfOp) { traverseChannel(sn.getInput()); return; } else { @@ -163,7 +163,7 @@ public class JavaApiPostPass implements OptimizerPostPass { } } - SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) sn.getOptimizerNode().getPactContract(); + SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) sn.getOptimizerNode().getOperator(); // parameterize the node's driver strategy for(int i=0;i<sn.getDriverStrategy().getNumRequiredComparators();i++) { @@ -181,11 +181,11 @@ public class JavaApiPostPass implements OptimizerPostPass { else if (node instanceof DualInputPlanNode) { DualInputPlanNode dn = (DualInputPlanNode) node; - if (!(dn.getOptimizerNode().getPactContract() instanceof DualInputOperator)) { + if (!(dn.getOptimizerNode().getOperator() instanceof DualInputOperator)) { throw new RuntimeException("Wrong operator type found in post pass."); } - DualInputOperator<?, ?, ?, ?> dualInputOperator = (DualInputOperator<?, ?, ?, ?>) dn.getOptimizerNode().getPactContract(); + DualInputOperator<?, ?, ?, ?> dualInputOperator = (DualInputOperator<?, ?, ?, ?>) dn.getOptimizerNode().getOperator(); // parameterize the node's driver strategy if (dn.getDriverStrategy().getNumRequiredComparators() > 0) { @@ -229,7 +229,7 @@ public class JavaApiPostPass implements OptimizerPostPass { private void traverseChannel(Channel channel) { PlanNode source = channel.getSource(); - Operator<?> javaOp = source.getPactContract(); + Operator<?> javaOp = source.getProgramOperator(); // if (!(javaOp instanceof BulkIteration) && !(javaOp instanceof JavaPlanNode)) { // throw new RuntimeException("Wrong operator type found in post pass: " + javaOp); @@ -271,7 +271,7 @@ public class JavaApiPostPass implements OptimizerPostPass { @SuppressWarnings("unchecked") private static <T> TypeInformation<T> getTypeInfoFromSource(SourcePlanNode node) { - Operator<?> op = node.getOptimizerNode().getPactContract(); + Operator<?> op = node.getOptimizerNode().getOperator(); if (op instanceof GenericDataSourceBase) { return ((GenericDataSourceBase<T, ?>) op).getOperatorInfo().getOutputType(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java index dae7a2e..8a2d006 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java @@ -54,7 +54,7 @@ public class RecordModelPostPass extends GenericFlatTypePostPass<Class<? extends @Override protected void getSinkSchema(SinkPlanNode sinkPlanNode, SparseKeySchema schema) throws CompilerPostPassException { - GenericDataSinkBase<?> sink = sinkPlanNode.getSinkNode().getPactContract(); + GenericDataSinkBase<?> sink = sinkPlanNode.getSinkNode().getOperator(); Ordering partitioning = sink.getPartitionOrdering(); Ordering sorting = sink.getLocalOrder(); @@ -76,7 +76,7 @@ public class RecordModelPostPass extends GenericFlatTypePostPass<Class<? extends throws CompilerPostPassException, ConflictingFieldTypeInfoException { // check that we got the right types - SingleInputOperator<?, ?, ?> contract = (SingleInputOperator<?, ?, ?>) node.getSingleInputNode().getPactContract(); + SingleInputOperator<?, ?, ?> contract = (SingleInputOperator<?, ?, ?>) node.getSingleInputNode().getOperator(); if (! (contract instanceof RecordOperator)) { throw new CompilerPostPassException("Error: Operator is not a Record based contract. Wrong compiler invokation."); } @@ -103,7 +103,7 @@ public class RecordModelPostPass extends GenericFlatTypePostPass<Class<? extends throws CompilerPostPassException, ConflictingFieldTypeInfoException { // add the nodes local information. this automatically consistency checks - DualInputOperator<?, ?, ?, ?> contract = node.getTwoInputNode().getPactContract(); + DualInputOperator<?, ?, ?, ?> contract = node.getTwoInputNode().getOperator(); if (! (contract instanceof RecordOperator)) { throw new CompilerPostPassException("Error: Operator is not a Pact Record based contract. Wrong compiler invokation."); }