[FLINK-441] [optimizer] Removed obsolete plan validator

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c91fb7c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c91fb7c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c91fb7c2

Branch: refs/heads/master
Commit: c91fb7c2f058272d8207a1d2e9396755c75cd853
Parents: 258cede
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 17 10:57:08 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 20 10:21:14 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/LocalExecutor.java  |   4 -
 .../org/apache/flink/client/program/Client.java |   5 +-
 .../optimizer/contextcheck/ContextChecker.java  | 211 -------------------
 .../contextcheck/MissingChildException.java     |  35 ---
 .../optimizer/contextcheck/Validatable.java     |  30 ---
 .../mapred/record/HadoopDataSink.java           |  13 +-
 6 files changed, 2 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 46598c1..a2aed8f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.PactCompiler;
-import org.apache.flink.optimizer.contextcheck.ContextChecker;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
@@ -136,9 +135,6 @@ public class LocalExecutor extends PlanExecutor {
                        throw new IllegalArgumentException("The plan may not be 
null.");
                }
                
-               ContextChecker checker = new ContextChecker();
-               checker.check(plan);
-               
                synchronized (this.lock) {
                        
                        // check if we start a session dedicated for this 
execution

http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index ce77275..84e6637 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -33,7 +33,6 @@ import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.PactCompiler;
-import org.apache.flink.optimizer.contextcheck.ContextChecker;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.FlinkPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -194,9 +193,7 @@ public class Client {
                if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
                        p.setDefaultParallelism(parallelism);
                }
-               
-               ContextChecker checker = new ContextChecker();
-               checker.check(p);
+
                return this.compiler.compile(p);
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/ContextChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/ContextChecker.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/ContextChecker.java
deleted file mode 100644
index da73d36..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/ContextChecker.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.contextcheck;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.DualInputOperator;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.api.common.operators.base.FileDataSinkBase;
-import org.apache.flink.api.common.operators.base.FileDataSourceBase;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Visitor;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Traverses a plan and checks whether all Contracts are correctly connected to
- * their input contracts.
- */
-public class ContextChecker implements Visitor<Operator<?>> {
-
-       /**
-        * A set of all already visited nodes during DAG traversal. Is used
-        * to avoid processing one node multiple times.
-        */
-       private final Set<Operator<?>> visitedNodes = new 
HashSet<Operator<?>>();
-
-       /**
-        * Default constructor
-        */
-       public ContextChecker() {}
-
-       /**
-        * Checks whether the given plan is valid. In particular it is checked 
that
-        * all contracts have the correct number of inputs and all inputs are 
of the
-        * expected type. In case of an invalid plan an extended 
RuntimeException is
-        * thrown.
-        * 
-        * @param plan
-        *        The plan to check.
-        */
-       public void check(Plan plan) {
-               Preconditions.checkNotNull(plan, "The passed plan is null.");
-               this.visitedNodes.clear();
-               plan.accept(this);
-       }
-
-       /**
-        * Checks whether the node is correctly connected to its input.
-        */
-       @Override
-       public boolean preVisit(Operator<?> node) {
-               // check if node was already visited
-               if (!this.visitedNodes.add(node)) {
-                       return false;
-               }
-
-               // apply the appropriate check method
-               if (node instanceof FileDataSinkBase) {
-                       checkFileDataSink((FileDataSinkBase<?>) node);
-               } else if (node instanceof FileDataSourceBase) {
-                       checkFileDataSource((FileDataSourceBase<?>) node);
-               } else if (node instanceof GenericDataSinkBase) {
-                       checkDataSink((GenericDataSinkBase<?>) node);
-               } else if (node instanceof BulkIterationBase) {
-                       checkBulkIteration((BulkIterationBase<?>) node);
-               } else if (node instanceof SingleInputOperator) {
-                       checkSingleInputContract((SingleInputOperator<?, ?, ?>) 
node);
-               } else if (node instanceof DualInputOperator<?, ?, ?, ?>) {
-                       checkDualInputContract((DualInputOperator<?, ?, ?, ?>) 
node);
-               }
-               if(node instanceof Validatable) {
-                       ((Validatable) node).check();
-               }
-               return true;
-       }
-
-       @Override
-       public void postVisit(Operator<?> node) {}
-
-       /**
-        * Checks if DataSinkContract is correctly connected. In case that the
-        * contract is incorrectly connected a RuntimeException is thrown.
-        * 
-        * @param dataSinkContract
-        *        DataSinkContract that is checked.
-        */
-       private void checkDataSink(GenericDataSinkBase<?> dataSinkContract) {
-               Operator<?> input = dataSinkContract.getInput();
-               // check if input exists
-               if (input == null) {
-                       throw new MissingChildException();
-               }
-       }
-       
-       /**
-        * Checks if FileDataSink is correctly connected. In case that the
-        * contract is incorrectly connected a RuntimeException is thrown.
-        * 
-        * @param fileSink
-        *        FileDataSink that is checked.
-        */
-       private void checkFileDataSink(FileDataSinkBase<?> fileSink) {
-               String path = fileSink.getFilePath();
-               if (path == null) {
-                       throw new InvalidProgramException("File path of 
FileDataSink is null.");
-               }
-               if (path.length() == 0) {
-                       throw new InvalidProgramException("File path of 
FileDataSink is empty string.");
-               }
-               
-               try {
-                       Path p = new Path(path);
-                       String scheme = p.toUri().getScheme();
-                       
-                       if (scheme == null) {
-                               throw new InvalidProgramException("File path 
\"" + path + "\" of FileDataSink has no file system scheme (like 'file:// or 
hdfs://').");
-                       }
-               } catch (Exception e) {
-                       throw new InvalidProgramException("File path \"" + path 
+ "\" of FileDataSink is an invalid path: " + e.getMessage());
-               }
-               checkDataSink(fileSink);
-       }
-       
-       /**
-        * Checks if FileDataSource is correctly connected. In case that the
-        * contract is incorrectly connected a RuntimeException is thrown.
-        * 
-        * @param fileSource
-        *        FileDataSource that is checked.
-        */
-       private void checkFileDataSource(FileDataSourceBase<?> fileSource) {
-               String path = fileSource.getFilePath();
-               if (path == null) {
-                       throw new InvalidProgramException("File path of 
FileDataSource is null.");
-               }
-               if (path.length() == 0) {
-                       throw new InvalidProgramException("File path of 
FileDataSource is empty string.");
-               }
-               
-               try {
-                       Path p = new Path(path);
-                       String scheme = p.toUri().getScheme();
-                       
-                       if (scheme == null) {
-                               throw new InvalidProgramException("File path 
\"" + path + "\" of FileDataSource has no file system scheme (like 'file:// or 
hdfs://').");
-                       }
-               } catch (Exception e) {
-                       throw new InvalidProgramException("File path \"" + path 
+ "\" of FileDataSource is an invalid path: " + e.getMessage());
-               }
-       }
-
-       /**
-        * Checks whether a SingleInputOperator is correctly connected. In case 
that
-        * the contract is incorrectly connected a RuntimeException is thrown.
-        * 
-        * @param singleInputContract
-        *        SingleInputOperator that is checked.
-        */
-       private void checkSingleInputContract(SingleInputOperator<?, ?, ?> 
singleInputContract) {
-               Operator<?> input = singleInputContract.getInput();
-               // check if input exists
-               if (input == null) {
-                       throw new MissingChildException();
-               }
-       }
-
-       /**
-        * Checks whether a DualInputOperator is correctly connected. In case 
that
-        * the contract is incorrectly connected a RuntimeException is thrown.
-        * 
-        * @param dualInputContract
-        *        DualInputOperator that is checked.
-        */
-       private void checkDualInputContract(DualInputOperator<?, ?, ?, ?> 
dualInputContract) {
-               Operator<?> input1 = dualInputContract.getFirstInput();
-               Operator<?> input2 = dualInputContract.getSecondInput();
-               // check if input exists
-               if (input1 == null || input2 == null) {
-                       throw new MissingChildException();
-               }
-       }
-       
-       private void checkBulkIteration(BulkIterationBase<?> iter) {
-               iter.validate();
-               checkSingleInputContract(iter);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/MissingChildException.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/MissingChildException.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/MissingChildException.java
deleted file mode 100644
index 04b2b4c..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/MissingChildException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.contextcheck;
-
-import org.apache.flink.api.common.InvalidProgramException;
-
-public class MissingChildException extends InvalidProgramException {
-       
-       private static final long serialVersionUID = 4206417538759568484L;
-       
-       public MissingChildException(String message) {
-               super(message);
-       }
-       
-       public MissingChildException() {
-               super();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/Validatable.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/Validatable.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/Validatable.java
deleted file mode 100644
index 161bd66..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/contextcheck/Validatable.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.contextcheck;
-
-/**
- * Operators implementing this interface
- * will be called from the {@link ContextChecker} during
- * the compilation process.
- *
- */
-public interface Validatable {
-       public void check();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c91fb7c2/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
index a86cc62..3b8064f 100644
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
+++ 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.hadoopcompatibility.mapred.record;
 
 import java.util.Collections;
@@ -24,11 +23,9 @@ import java.util.List;
 
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.optimizer.contextcheck.Validatable;
 import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter;
 import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
 import org.apache.flink.types.Record;
-import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 
@@ -45,7 +42,7 @@ import org.apache.hadoop.mapred.OutputFormat;
  *
  * The HadoopDataSink provides a default converter: {@link 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter}
  **/
-public class HadoopDataSink<K,V> extends GenericDataSink implements 
Validatable {
+public class HadoopDataSink<K,V> extends GenericDataSink {
 
        private static String DEFAULT_NAME = "<Unnamed Hadoop Data Sink>";
 
@@ -98,12 +95,4 @@ public class HadoopDataSink<K,V> extends GenericDataSink 
implements Validatable
        public JobConf getJobConf() {
                return this.jobConf;
        }
-
-       @Override
-       public void check() {
-               // see for more details 
https://github.com/stratosphere/stratosphere/pull/531
-               if (FileOutputFormat.getOutputPath(jobConf) == null) {
-                       throw new NullPointerException("The HadoopDataSink 
currently expects a correct outputPath.");
-               }
-       }
 }

Reply via email to