[FLINK-441] [optimizer] Removed obsolete plan validator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c91fb7c2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c91fb7c2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c91fb7c2 Branch: refs/heads/master Commit: c91fb7c2f058272d8207a1d2e9396755c75cd853 Parents: 258cede Author: Stephan Ewen <se...@apache.org> Authored: Tue Mar 17 10:57:08 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Mar 20 10:21:14 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/LocalExecutor.java | 4 - .../org/apache/flink/client/program/Client.java | 5 +- .../optimizer/contextcheck/ContextChecker.java | 211 ------------------- .../contextcheck/MissingChildException.java | 35 --- .../optimizer/contextcheck/Validatable.java | 30 --- .../mapred/record/HadoopDataSink.java | 13 +- 6 files changed, 2 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index 46598c1..a2aed8f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.PactCompiler; -import org.apache.flink.optimizer.contextcheck.ContextChecker; import org.apache.flink.optimizer.dag.DataSinkNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; @@ -136,9 +135,6 @@ public class LocalExecutor extends PlanExecutor { throw new IllegalArgumentException("The plan may not be null."); } - ContextChecker checker = new ContextChecker(); - checker.check(plan); - synchronized (this.lock) { // check if we start a session dedicated for this execution http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index ce77275..84e6637 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -33,7 +33,6 @@ import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.PactCompiler; -import org.apache.flink.optimizer.contextcheck.ContextChecker; import org.apache.flink.optimizer.costs.DefaultCostEstimator; import org.apache.flink.optimizer.plan.FlinkPlan; import org.apache.flink.optimizer.plan.OptimizedPlan; @@ -194,9 +193,7 @@ public class Client { if (parallelism > 0 && p.getDefaultParallelism() <= 0) { p.setDefaultParallelism(parallelism); } - - ContextChecker checker = new ContextChecker(); - checker.check(p); + return this.compiler.compile(p); } http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/ContextChecker.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/ContextChecker.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/ContextChecker.java deleted file mode 100644 index da73d36..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/ContextChecker.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.contextcheck; - -import java.util.HashSet; -import java.util.Set; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.operators.DualInputOperator; -import org.apache.flink.api.common.operators.GenericDataSinkBase; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.operators.SingleInputOperator; -import org.apache.flink.api.common.operators.base.BulkIterationBase; -import org.apache.flink.api.common.operators.base.FileDataSinkBase; -import org.apache.flink.api.common.operators.base.FileDataSourceBase; -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.Visitor; - -import com.google.common.base.Preconditions; - -/** - * Traverses a plan and checks whether all Contracts are correctly connected to - * their input contracts. - */ -public class ContextChecker implements Visitor<Operator<?>> { - - /** - * A set of all already visited nodes during DAG traversal. Is used - * to avoid processing one node multiple times. - */ - private final Set<Operator<?>> visitedNodes = new HashSet<Operator<?>>(); - - /** - * Default constructor - */ - public ContextChecker() {} - - /** - * Checks whether the given plan is valid. In particular it is checked that - * all contracts have the correct number of inputs and all inputs are of the - * expected type. In case of an invalid plan an extended RuntimeException is - * thrown. - * - * @param plan - * The plan to check. - */ - public void check(Plan plan) { - Preconditions.checkNotNull(plan, "The passed plan is null."); - this.visitedNodes.clear(); - plan.accept(this); - } - - /** - * Checks whether the node is correctly connected to its input. - */ - @Override - public boolean preVisit(Operator<?> node) { - // check if node was already visited - if (!this.visitedNodes.add(node)) { - return false; - } - - // apply the appropriate check method - if (node instanceof FileDataSinkBase) { - checkFileDataSink((FileDataSinkBase<?>) node); - } else if (node instanceof FileDataSourceBase) { - checkFileDataSource((FileDataSourceBase<?>) node); - } else if (node instanceof GenericDataSinkBase) { - checkDataSink((GenericDataSinkBase<?>) node); - } else if (node instanceof BulkIterationBase) { - checkBulkIteration((BulkIterationBase<?>) node); - } else if (node instanceof SingleInputOperator) { - checkSingleInputContract((SingleInputOperator<?, ?, ?>) node); - } else if (node instanceof DualInputOperator<?, ?, ?, ?>) { - checkDualInputContract((DualInputOperator<?, ?, ?, ?>) node); - } - if(node instanceof Validatable) { - ((Validatable) node).check(); - } - return true; - } - - @Override - public void postVisit(Operator<?> node) {} - - /** - * Checks if DataSinkContract is correctly connected. In case that the - * contract is incorrectly connected a RuntimeException is thrown. - * - * @param dataSinkContract - * DataSinkContract that is checked. - */ - private void checkDataSink(GenericDataSinkBase<?> dataSinkContract) { - Operator<?> input = dataSinkContract.getInput(); - // check if input exists - if (input == null) { - throw new MissingChildException(); - } - } - - /** - * Checks if FileDataSink is correctly connected. In case that the - * contract is incorrectly connected a RuntimeException is thrown. - * - * @param fileSink - * FileDataSink that is checked. - */ - private void checkFileDataSink(FileDataSinkBase<?> fileSink) { - String path = fileSink.getFilePath(); - if (path == null) { - throw new InvalidProgramException("File path of FileDataSink is null."); - } - if (path.length() == 0) { - throw new InvalidProgramException("File path of FileDataSink is empty string."); - } - - try { - Path p = new Path(path); - String scheme = p.toUri().getScheme(); - - if (scheme == null) { - throw new InvalidProgramException("File path \"" + path + "\" of FileDataSink has no file system scheme (like 'file:// or hdfs://')."); - } - } catch (Exception e) { - throw new InvalidProgramException("File path \"" + path + "\" of FileDataSink is an invalid path: " + e.getMessage()); - } - checkDataSink(fileSink); - } - - /** - * Checks if FileDataSource is correctly connected. In case that the - * contract is incorrectly connected a RuntimeException is thrown. - * - * @param fileSource - * FileDataSource that is checked. - */ - private void checkFileDataSource(FileDataSourceBase<?> fileSource) { - String path = fileSource.getFilePath(); - if (path == null) { - throw new InvalidProgramException("File path of FileDataSource is null."); - } - if (path.length() == 0) { - throw new InvalidProgramException("File path of FileDataSource is empty string."); - } - - try { - Path p = new Path(path); - String scheme = p.toUri().getScheme(); - - if (scheme == null) { - throw new InvalidProgramException("File path \"" + path + "\" of FileDataSource has no file system scheme (like 'file:// or hdfs://')."); - } - } catch (Exception e) { - throw new InvalidProgramException("File path \"" + path + "\" of FileDataSource is an invalid path: " + e.getMessage()); - } - } - - /** - * Checks whether a SingleInputOperator is correctly connected. In case that - * the contract is incorrectly connected a RuntimeException is thrown. - * - * @param singleInputContract - * SingleInputOperator that is checked. - */ - private void checkSingleInputContract(SingleInputOperator<?, ?, ?> singleInputContract) { - Operator<?> input = singleInputContract.getInput(); - // check if input exists - if (input == null) { - throw new MissingChildException(); - } - } - - /** - * Checks whether a DualInputOperator is correctly connected. In case that - * the contract is incorrectly connected a RuntimeException is thrown. - * - * @param dualInputContract - * DualInputOperator that is checked. - */ - private void checkDualInputContract(DualInputOperator<?, ?, ?, ?> dualInputContract) { - Operator<?> input1 = dualInputContract.getFirstInput(); - Operator<?> input2 = dualInputContract.getSecondInput(); - // check if input exists - if (input1 == null || input2 == null) { - throw new MissingChildException(); - } - } - - private void checkBulkIteration(BulkIterationBase<?> iter) { - iter.validate(); - checkSingleInputContract(iter); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/MissingChildException.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/MissingChildException.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/MissingChildException.java deleted file mode 100644 index 04b2b4c..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/MissingChildException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.contextcheck; - -import org.apache.flink.api.common.InvalidProgramException; - -public class MissingChildException extends InvalidProgramException { - - private static final long serialVersionUID = 4206417538759568484L; - - public MissingChildException(String message) { - super(message); - } - - public MissingChildException() { - super(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/Validatable.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/Validatable.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/Validatable.java deleted file mode 100644 index 161bd66..0000000 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/Validatable.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.optimizer.contextcheck; - -/** - * Operators implementing this interface - * will be called from the {@link ContextChecker} during - * the compilation process. - * - */ -public interface Validatable { - public void check(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java index a86cc62..3b8064f 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.hadoopcompatibility.mapred.record; import java.util.Collections; @@ -24,11 +23,9 @@ import java.util.List; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.java.record.operators.GenericDataSink; -import org.apache.flink.optimizer.contextcheck.Validatable; import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter; import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter; import org.apache.flink.types.Record; -import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -45,7 +42,7 @@ import org.apache.hadoop.mapred.OutputFormat; * * The HadoopDataSink provides a default converter: {@link org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter} **/ -public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable { +public class HadoopDataSink<K,V> extends GenericDataSink { private static String DEFAULT_NAME = "<Unnamed Hadoop Data Sink>"; @@ -98,12 +95,4 @@ public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable public JobConf getJobConf() { return this.jobConf; } - - @Override - public void check() { - // see for more details https://github.com/stratosphere/stratosphere/pull/531 - if (FileOutputFormat.getOutputPath(jobConf) == null) { - throw new NullPointerException("The HadoopDataSink currently expects a correct outputPath."); - } - } }