[FLINK-2050] [ml] Introduces new pipelining mechanism using implicit classes to wrap the algorithm logic
This closes #704. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fde0341f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fde0341f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fde0341f Branch: refs/heads/master Commit: fde0341fe16c7258e42f77e289a557157995830c Parents: b602b2e Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue May 5 15:04:32 2015 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri May 22 09:42:21 2015 +0200 ---------------------------------------------------------------------- .../src/test/assembly/test-assembly.xml | 2 +- .../flink/configuration/ConfigurationTest.java | 2 +- flink-dist/src/main/assemblies/bin.xml | 2 +- flink-examples/flink-java-examples/pom.xml | 4 +- .../examples/java/relational/TPCHQuery3.java | 2 +- flink-examples/flink-scala-examples/pom.xml | 6 +- .../aggregation/AvgAggregationFunction.java | 2 +- .../flink/api/java/tuple/TupleGenerator.java | 8 +- flink-java8/pom.xml | 2 +- .../optimizer/traversals/package-info.java | 2 +- .../main/resources/archetype-resources/pom.xml | 8 +- .../archetype-resources/src/main/java/Job.java | 4 +- .../main/resources/archetype-resources/pom.xml | 8 +- .../src/main/scala/Job.scala | 4 +- .../src/main/java/LocalJob.java | 4 +- .../src/main/java/YarnJob.java | 4 +- .../messages/checkpoint/package-info.java | 2 +- .../flink/runtime/messages/package-info.java | 2 +- .../flink/runtime/util/JarFileCreator.java | 4 +- .../flink/runtime/util/JarFileCreatorTest.java | 2 +- .../org/apache/flink/api/scala/DataSet.scala | 2 +- .../src/test/assembly/test-assembly.xml | 2 +- .../datatypes/HadoopFileOutputCommitter.java | 2 +- .../api/java/python/PythonPlanBinder.java | 4 +- .../ml/experimental/ChainedPredictor.scala | 67 +++++++++++ .../ml/experimental/ChainedTransformer.scala | 65 +++++++++++ .../flink/ml/experimental/Estimator.scala | 110 +++++++++++++++++++ .../apache/flink/ml/experimental/KMeans.scala | 50 +++++++++ .../apache/flink/ml/experimental/Offset.scala | 50 +++++++++ .../flink/ml/experimental/Predictor.scala | 87 +++++++++++++++ .../apache/flink/ml/experimental/Scaler.scala | 52 +++++++++ .../flink/ml/experimental/Transformer.scala | 94 ++++++++++++++++ .../org/apache/flink/ml/math/CanCopy.scala | 23 ++++ .../org/apache/flink/ml/math/DenseVector.scala | 11 +- .../org/apache/flink/ml/math/SparseVector.scala | 2 +- .../scala/org/apache/flink/ml/math/Vector.scala | 8 +- .../org/apache/flink/ml/math/package.scala | 2 + .../ml/experimental/SciKitPipelineSuite.scala | 70 ++++++++++++ .../flink-streaming-examples/pom.xml | 2 +- .../apache/flink/api/table/package-info.java | 2 +- .../apache/flink/api/scala/table/package.scala | 2 +- .../flink/api/table/expressions/package.scala | 2 +- .../org/apache/flink/api/table/package.scala | 2 +- .../apache/flink/api/table/plan/package.scala | 2 +- .../flink/api/table/runtime/package.scala | 2 +- .../tachyon/TachyonFileSystemWrapperTest.java | 2 +- .../apache/flink/tez/examples/TPCHQuery3.java | 2 +- .../test/assembly/test-custominput-assembly.xml | 2 +- .../src/test/assembly/test-kmeans-assembly.xml | 2 +- .../test-streamingclassloader-assembly.xml | 2 +- .../test/recordJobTests/TPCHQuery10ITCase.java | 2 +- .../test/recordJobTests/TPCHQuery3ITCase.java | 2 +- .../TPCHQuery3WithUnionITCase.java | 2 +- .../test/recordJobTests/TPCHQuery4ITCase.java | 2 +- .../test/recordJobTests/TPCHQuery9ITCase.java | 2 +- flink-yarn-tests/pom.xml | 2 +- .../org/apache/flink/yarn/YarnTestBase.java | 2 +- .../org/apache/flink/yarn/FlinkYarnClient.java | 2 +- 58 files changed, 749 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-clients/src/test/assembly/test-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/assembly/test-assembly.xml b/flink-clients/src/test/assembly/test-assembly.xml index aa7b7d1..60b27d2 100644 --- a/flink-clients/src/test/assembly/test-assembly.xml +++ b/flink-clients/src/test/assembly/test-assembly.xml @@ -27,7 +27,7 @@ under the License. <fileSet> <directory>${project.build.testOutputDirectory}</directory> <outputDirectory>/</outputDirectory> - <!--modify/add include to match your package(s) --> + <!--modify/add include to match your pipeline(s) --> <includes> <include>org/apache/flink/client/testjar/**</include> </includes> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index e131892..f7039c7 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -28,7 +28,7 @@ import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; /** - * This class contains test for the configuration package. In particular, the serialization of {@link Configuration} + * This class contains test for the configuration pipeline. In particular, the serialization of {@link Configuration} * objects is tested. */ public class ConfigurationTest { http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-dist/src/main/assemblies/bin.xml ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index 6a429ee..d9dacf3 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -130,7 +130,7 @@ under the License. </excludes> </fileSet> <fileSet> - <!-- copy python package --> + <!-- copy python pipeline --> <directory>../flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python</directory> <outputDirectory>resources/python/</outputDirectory> <fileMode>0755</fileMode> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-examples/flink-java-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml index a7964cb..2a335b5 100644 --- a/flink-examples/flink-java-examples/pom.xml +++ b/flink-examples/flink-java-examples/pom.xml @@ -205,7 +205,7 @@ under the License. <!-- <execution> <id>TPCHQuery10</id> - <phase>package</phase> + <phase>pipeline</phase> <goals> <goal>jar</goal> </goals> @@ -228,7 +228,7 @@ under the License. <!-- <execution> <id>TPCHQuery3</id> - <phase>package</phase> + <phase>pipeline</phase> <goals> <goal>jar</goal> </goals> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java index 9a6e58c..e66493a 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java @@ -135,7 +135,7 @@ public class TPCHQuery3 { } }); - // Join customers with orders and package them into a ShippingPriorityItem + // Join customers with orders and pipeline them into a ShippingPriorityItem DataSet<ShippingPriorityItem> customerWithOrders = customers.join(orders).where(0).equalTo(1) .with( http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-examples/flink-scala-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml index 5127c48..14fa874 100644 --- a/flink-examples/flink-scala-examples/pom.xml +++ b/flink-examples/flink-scala-examples/pom.xml @@ -170,7 +170,7 @@ under the License. </configuration> </plugin> - <!-- get default data from flink-java-examples package --> + <!-- get default data from flink-java-examples pipeline --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> @@ -357,7 +357,7 @@ under the License. <!-- <execution> <id>TPCHQuery10</id> - <phase>package</phase> + <phase>pipeline</phase> <goals> <goal>jar</goal> </goals> @@ -380,7 +380,7 @@ under the License. <!-- <execution> <id>TPCHQuery3</id> - <phase>package</phase> + <phase>pipeline</phase> <goals> <goal>jar</goal> </goals> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java index b433d66..1472cd9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java @@ -17,7 +17,7 @@ package org.apache.flink.api.java.aggregation; * limitations under the License. */ -//package org.apache.flink.api.java.aggregation; +//pipeline org.apache.flink.api.java.aggregation; // // //public abstract class AvgAggregationFunction<T> extends AggregationFunction<T> { http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java index 03826fc..2149180 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java @@ -524,8 +524,8 @@ class TupleGenerator { // head w.print(HEADER); - // package and imports - w.println("package " + PACKAGE + ';'); + // pipeline and imports + w.println("pipeline " + PACKAGE + ';'); w.println(); w.println("import org.apache.flink.util.StringUtils;"); w.println(); @@ -780,8 +780,8 @@ class TupleGenerator { // head w.print(HEADER); - // package and imports - w.println("package " + PACKAGE + "." + BUILDER_SUFFIX + ';'); + // pipeline and imports + w.println("pipeline " + PACKAGE + "." + BUILDER_SUFFIX + ';'); w.println(); w.println("import java.util.LinkedList;"); w.println("import java.util.List;"); http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-java8/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml index 0d82ea2..5293221 100644 --- a/flink-java8/pom.xml +++ b/flink-java8/pom.xml @@ -101,7 +101,7 @@ under the License. </configuration> </plugin> - <!-- get default data from flink-java-examples package --> + <!-- get default data from flink-java-examples pipeline --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java index cd8766c..d125475 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java @@ -17,7 +17,7 @@ */ /** - * This package contains the various traversals over the program plan and the + * This pipeline contains the various traversals over the program plan and the * optimizer DAG (directed acyclic graph) that are made in the course of * the optimization. * http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml ---------------------------------------------------------------------- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index 30f2315..b1e3fac 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -48,7 +48,7 @@ under the License. <!-- - Execute "mvn clean package -Pbuild-jar" + Execute "mvn clean pipeline -Pbuild-jar" to build a jar file out of this project! How to use the Flink Quickstart pom: @@ -61,11 +61,11 @@ under the License. b) Build a jar for running on the cluster: There are two options for creating a jar from this project - b.1) "mvn clean package" -> this will create a fat jar which contains all + b.1) "mvn clean pipeline" -> this will create a fat jar which contains all dependencies necessary for running the jar created by this pom in a cluster. The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster. - b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much + b.2) "mvn clean pipeline -Pbuild-jar" -> This will also create a fat-jar, but with much nicer dependency exclusion handling. This approach is preferred and leads to much cleaner jar files. --> @@ -98,7 +98,7 @@ under the License. <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> - <!-- Run shade goal on package phase --> + <!-- Run shade goal on pipeline phase --> <execution> <phase>package</phase> <goals> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java ---------------------------------------------------------------------- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java index 603fc80..6e813a4 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java @@ -24,12 +24,12 @@ import org.apache.flink.api.java.ExecutionEnvironment; * Skeleton for a Flink Job. * * For a full example of a Flink Job, see the WordCountJob.java file in the - * same package/directory or have a look at the website. + * same pipeline/directory or have a look at the website. * * You can also generate a .jar file that you can submit on your Flink * cluster. * Just type - * mvn clean package + * mvn clean pipeline * in the projects root directory. * You will find the jar in * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml ---------------------------------------------------------------------- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml index e940b90..299fa24 100644 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml @@ -49,7 +49,7 @@ under the License. <!-- - Execute "mvn clean package -Pbuild-jar" + Execute "mvn clean pipeline -Pbuild-jar" to build a jar file out of this project! How to use the Flink Quickstart pom: @@ -62,11 +62,11 @@ under the License. b) Build a jar for running on the cluster: There are two options for creating a jar from this project - b.1) "mvn clean package" -> this will create a fat jar which contains all + b.1) "mvn clean pipeline" -> this will create a fat jar which contains all dependencies necessary for running the jar created by this pom in a cluster. The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster. - b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much + b.2) "mvn clean pipeline -Pbuild-jar" -> This will also create a fat-jar, but with much nicer dependency exclusion handling. This approach is preferred and leads to much cleaner jar files. --> @@ -102,7 +102,7 @@ under the License. <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> - <!-- Run shade goal on package phase --> + <!-- Run shade goal on pipeline phase --> <execution> <phase>package</phase> <goals> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala ---------------------------------------------------------------------- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala index 3c34b0a..44a7a03 100644 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala @@ -24,12 +24,12 @@ import org.apache.flink.api.scala._ * Skeleton for a Flink Job. * * For a full example of a Flink Job, see the WordCountJob.scala file in the - * same package/directory or have a look at the website. + * same pipeline/directory or have a look at the website. * * You can also generate a .jar file that you can submit on your Flink * cluster. Just type * {{{ - * mvn clean package + * mvn clean pipeline * }}} * in the projects root directory. You will find the jar in * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java ---------------------------------------------------------------------- diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java index cf7474e..e324420 100644 --- a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java +++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java @@ -24,12 +24,12 @@ import org.apache.flink.tez.client.LocalTezEnvironment; * Skeleton for a Flink on Tez Job running using Tez local mode. * * For a full example of a Flink on TezJob, see the WordCountJob.java file in the - * same package/directory or have a look at the website. + * same pipeline/directory or have a look at the website. * * You can also generate a .jar file that you can submit on your Flink * cluster. * Just type - * mvn clean package + * mvn clean pipeline * in the projects root directory. * You will find the jar in * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java ---------------------------------------------------------------------- diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java index 51627d5..1b0bbcf 100644 --- a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java +++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java @@ -24,12 +24,12 @@ import org.apache.flink.tez.client.RemoteTezEnvironment; * Skeleton for a Flink on Tez program running on Yarn. * * For a full example of a Flink on Tez program, see the WordCountJob.java file in the - * same package/directory or have a look at the website. + * same pipeline/directory or have a look at the website. * * You can also generate a .jar file that you can submit on your Flink * cluster. * Just type - * mvn clean package + * mvn clean pipeline * in the projects root directory. * You will find the jar in * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java index 7b96b81..7e422b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java @@ -17,7 +17,7 @@ */ /** - * This package contains the messages that are sent between {@link org.apache.flink.runtime.jobmanager.JobManager} + * This pipeline contains the messages that are sent between {@link org.apache.flink.runtime.jobmanager.JobManager} * and {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the checkpoint snapshots of the * distributed dataflow. */ http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java index e0b8cce..78620a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java @@ -17,7 +17,7 @@ */ /** - * This package contains the messages that are sent between actors, like the + * This pipeline contains the messages that are sent between actors, like the * {@link org.apache.flink.runtime.jobmanager.JobManager} and * {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the distributed operations. */ http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java index c55a9dc..9e39777 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java @@ -91,10 +91,10 @@ public class JarFileCreator { } /** - * Manually specify the package of the dependencies. + * Manually specify the pipeline of the dependencies. * * @param p - * the package to be included. + * the pipeline to be included. */ public synchronized JarFileCreator addPackage(String p) { this.packages.add(p); http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java index ba207ec..2e39068 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java @@ -199,7 +199,7 @@ public class JarFileCreatorTest { ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class"); ans.add("org/apache/flink/util/Collector.class"); - Assert.assertTrue("Jar file for UDF package is not correct", validate(ans, out)); + Assert.assertTrue("Jar file for UDF pipeline is not correct", validate(ans, out)); out.delete(); } http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index e283e95..e146687 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -67,7 +67,7 @@ import scala.reflect.ClassTag * * A rich function can be used when more control is required, for example for accessing the * `RuntimeContext`. The rich function for `flatMap` is `RichFlatMapFunction`, all other functions - * are named similarly. All functions are available in package + * are named similarly. All functions are available in pipeline * `org.apache.flink.api.common.functions`. * * The elements are partitioned depending on the parallelism of the http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-avro/src/test/assembly/test-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/assembly/test-assembly.xml b/flink-staging/flink-avro/src/test/assembly/test-assembly.xml index 0f4561a..86563b0 100644 --- a/flink-staging/flink-avro/src/test/assembly/test-assembly.xml +++ b/flink-staging/flink-avro/src/test/assembly/test-assembly.xml @@ -27,7 +27,7 @@ under the License. <fileSet> <directory>${project.build.testOutputDirectory}</directory> <outputDirectory>/</outputDirectory> - <!--modify/add include to match your package(s) --> + <!--modify/add include to match your pipeline(s) --> <includes> <include>org/apache/flink/api/avro/testjar/**</include> </includes> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java index ce4955c..b9c28c5 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java @@ -34,7 +34,7 @@ import org.apache.hadoop.util.StringUtils; /** * Hadoop 1.2.1 {@link org.apache.hadoop.mapred.FileOutputCommitter} takes {@link org.apache.hadoop.mapred.JobContext} - * as input parameter. However JobContext class is package private, and in Hadoop 2.2.0 it's public. + * as input parameter. However JobContext class is pipeline private, and in Hadoop 2.2.0 it's public. * This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead of JobContext in order to setup and commit tasks. */ public class HadoopFileOutputCommitter extends FileOutputCommitter implements Serializable { http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java index c278f5c..7d180b6 100644 --- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java +++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java @@ -125,14 +125,14 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> { //=====Setup======================================================================================================== /** * Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). This allows us to distribute it as one big - * package, and resolves PYTHONPATH issues. + * pipeline, and resolves PYTHONPATH issues. * * @param filePaths * @throws IOException * @throws URISyntaxException */ private void prepareFiles(String... filePaths) throws IOException, URISyntaxException { - //Flink python package + //Flink python pipeline String tempFilePath = FLINK_PYTHON_FILE_PATH; clearPath(tempFilePath); FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), false); http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala new file mode 100644 index 0000000..2f36e8c --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.experimental + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.ParameterMap + +case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](transformer: T, predictor: P) + extends Predictor[ChainedPredictor[T, P]]{} + +object ChainedPredictor{ + implicit def chainedPredictOperation[ + T <: Transformer[T], + P <: Predictor[P], + Input, + Testing, + Prediction]( + implicit transform: TransformOperation[T, Input, Testing], + predictor: PredictOperation[P, Testing, Prediction]) + : PredictOperation[ChainedPredictor[T, P], Input, Prediction] = { + + new PredictOperation[ChainedPredictor[T, P], Input, Prediction] { + override def predict( + instance: ChainedPredictor[T, P], + predictParameters: ParameterMap, + input: DataSet[Input]) + : DataSet[Prediction] = { + + val testing = instance.transformer.transform(input, predictParameters) + instance.predictor.predict(testing, predictParameters) + } + } + } + + implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], I, T](implicit + leftFitOperation: FitOperation[L, I], + leftTransformOperation: TransformOperation[L, I, T], + rightFitOperation: FitOperation[R, T]): FitOperation[ChainedTransformer[L, R], I] = { + new FitOperation[ChainedTransformer[L, R], I] { + override def fit( + instance: ChainedTransformer[L, R], + fitParameters: ParameterMap, + input: DataSet[I]) + : Unit = { + instance.left.fit(input, fitParameters) + val intermediateResult = instance.left.transform(input, fitParameters) + instance.right.fit(intermediateResult, fitParameters) + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala new file mode 100644 index 0000000..dc9c611 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.experimental + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.ParameterMap + +case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](left: L, right: R) + extends Transformer[ChainedTransformer[L, R]] { +} + +object ChainedTransformer{ + implicit def chainedTransformOperation[ + L <: Transformer[L], + R <: Transformer[R], + I, + T, + O](implicit + transformLeft: TransformOperation[L, I, T], + transformRight: TransformOperation[R, T, O]) + : TransformOperation[ChainedTransformer[L,R], I, O] = { + + new TransformOperation[ChainedTransformer[L, R], I, O] { + override def transform( + chain: ChainedTransformer[L, R], + transformParameters: ParameterMap, + input: DataSet[I]): DataSet[O] = { + val intermediateResult = transformLeft.transform(chain.left, transformParameters, input) + transformRight.transform(chain.right, transformParameters, intermediateResult) + } + } + } + + implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], I, T](implicit + leftFitOperation: FitOperation[L, I], + leftTransformOperation: TransformOperation[L, I, T], + rightFitOperation: FitOperation[R, T]): FitOperation[ChainedTransformer[L, R], I] = { + new FitOperation[ChainedTransformer[L, R], I] { + override def fit( + instance: ChainedTransformer[L, R], + fitParameters: ParameterMap, + input: DataSet[I]): Unit = { + instance.left.fit(input, fitParameters) + val intermediateResult = instance.left.transform(input, fitParameters) + instance.right.fit(intermediateResult, fitParameters) + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala new file mode 100644 index 0000000..e0c81a4 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.experimental + +import scala.reflect.ClassTag + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.{ParameterMap, WithParameters} + +trait Estimator[Self] extends WithParameters with Serializable { + that: Self => + + def fit[Training]( + input: DataSet[Training], + fitParameters: ParameterMap = ParameterMap.Empty)(implicit + fitOperation: FitOperation[Self, Training]): Unit = { + fitOperation.fit(this, fitParameters, input) + } +} + +object Estimator{ + implicit def fallbackFitOperation[Self: ClassTag, Training: ClassTag] + : FitOperation[Self, Training] = { + new FitOperation[Self, Training]{ + override def fit( + instance: Self, + fitParameters: ParameterMap, + input: DataSet[Training]) + : Unit = { + new FitOperation[Self, Training] { + override def fit( + instance: Self, + fitParameters: ParameterMap, + input: DataSet[Training]) + : Unit = { + val self = implicitly[ClassTag[Self]] + val training = implicitly[ClassTag[Training]] + + throw new RuntimeException("There is no FitOperation defined for " + self.runtimeClass + + " which trains on a DataSet[" + training.runtimeClass + "]") + } + } + } + } + } + + implicit def fallbackChainedFitOperationTransformer[ + L <: Transformer[L], + R <: Transformer[R], + LI, + LO, + RI](implicit + leftFitOperation: FitOperation[L, LI], + leftTransformOperation: TransformOperation[L, LI, LO], + rightFitOperaiton: FitOperation[R, RI]) + : FitOperation[ChainedTransformer[L, R], LI] = { + new FitOperation[ChainedTransformer[L, R], LI] { + override def fit( + instance: ChainedTransformer[L, R], + fitParameters: ParameterMap, + input: DataSet[LI]): Unit = { + instance.left.fit(input, fitParameters) + instance.left.transform(input, fitParameters) + instance.right.fit(null, fitParameters) + } + } + } + + implicit def fallbackChainedFitOperationPredictor[ + L <: Transformer[L], + R <: Predictor[R], + LI, + LO, + RI](implicit + leftFitOperation: FitOperation[L, LI], + leftTransformOperation: TransformOperation[L, LI, LO], + rightFitOperaiton: FitOperation[R, RI]) + : FitOperation[ChainedPredictor[L, R], LI] = { + new FitOperation[ChainedPredictor[L, R], LI] { + override def fit( + instance: ChainedPredictor[L, R], + fitParameters: ParameterMap, + input: DataSet[LI]): Unit = { + instance.transformer.fit(input, fitParameters) + instance.transformer.transform(input, fitParameters) + instance.predictor.fit(null, fitParameters) + } + } + } +} + +trait FitOperation[Self, Training]{ + def fit(instance: Self, fitParameters: ParameterMap, input: DataSet[Training]): Unit +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala new file mode 100644 index 0000000..5acd34f --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.experimental + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{ParameterMap, LabeledVector} +import org.apache.flink.ml.math._ + +class KMeans extends Predictor[KMeans] { +} + +object KMeans{ + + implicit val kMeansEstimator = new FitOperation[KMeans, LabeledVector] { + override def fit( + instance: KMeans, + parameters: ParameterMap, + input: DataSet[LabeledVector]): Unit = { + input.print + } + } + + implicit def kMeansPredictor[V <: Vector] + = new PredictOperation[KMeans, V, LabeledVector] { + override def predict( + instance: KMeans, + parameters: ParameterMap, + input: DataSet[V]): DataSet[LabeledVector] = { + input.map{ + vector => LabeledVector(1.0, vector) + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala new file mode 100644 index 0000000..c9d082f --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.experimental + +import scala.reflect.ClassTag + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.ParameterMap +import org.apache.flink.ml.math._ + +class Offset extends Transformer[Offset] { +} + +object Offset{ + import Breeze._ + + implicit def offsetTransform[I <: Vector : CanCopy: ClassTag: TypeInformation] + = new TransformOperation[Offset, I, I] { + override def transform( + offset: Offset, + parameters: ParameterMap, + input: DataSet[I]): DataSet[I] = { + input.map{ + vector => + val brz = copy(vector).asBreeze + + val result = brz + 1.0 + + result.fromBreeze.asInstanceOf[I] + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala new file mode 100644 index 0000000..8ec6665 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.experimental + +import scala.reflect.ClassTag + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.{ParameterMap, WithParameters} + +trait Predictor[Self] extends Estimator[Self] with WithParameters with Serializable { + that: Self => + + def predict[Testing, Prediction]( + input: DataSet[Testing], + predictParameters: ParameterMap = ParameterMap.Empty)(implicit + predictor: PredictOperation[Self, Testing, Prediction]) + : DataSet[Prediction] = { + predictor.predict(this, predictParameters, input) + } +} + +object Predictor{ + implicit def fallbackPredictOperation[Self: ClassTag, Testing: ClassTag, Prediction: ClassTag] + : PredictOperation[Self, Testing, Prediction] = { + new PredictOperation[Self, Testing, Prediction] { + override def predict( + instance: Self, + predictParameters: ParameterMap, + input: DataSet[Testing]) + : DataSet[Prediction] = { + val self = implicitly[ClassTag[Self]] + val testing = implicitly[ClassTag[Testing]] + val prediction = implicitly[ClassTag[Prediction]] + + throw new RuntimeException("There is no PredictOperation defined for " + self.runtimeClass + + " which takes a DataSet[" + testing.runtimeClass + "] as input and returns a DataSet[" + + prediction.runtimeClass + "]") + } + } + } + + implicit def fallbackChainedPredictOperation[ + L <: Transformer[L], + R <: Predictor[R], + LI, + LO, + RI, + RO](implicit + leftTransformOperation: TransformOperation[L, LI, LO], + rightPredictOperation: PredictOperation[R, RI, RO] + ) + : PredictOperation[ChainedPredictor[L, R], LI, RO] = { + new PredictOperation[ChainedPredictor[L, R], LI, RO] { + override def predict( + instance: ChainedPredictor[L, R], + predictParameters: ParameterMap, + input: DataSet[LI]): DataSet[RO] = { + instance.transformer.transform(input, predictParameters) + instance.predictor.predict(null, predictParameters) + } + } + } +} + +abstract class PredictOperation[Self, Testing, Prediction]{ + def predict( + instance: Self, + predictParameters: ParameterMap, + input: DataSet[Testing]) + : DataSet[Prediction] +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala new file mode 100644 index 0000000..a68c5d3 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.experimental + +import scala.reflect.ClassTag + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.ParameterMap +import org.apache.flink.ml.math._ + +class Scaler extends Transformer[Scaler] { + var meanValue = 0.0 +} + +object Scaler{ + import Breeze._ + + implicit def vTransform[T <: Vector : CanCopy: ClassTag: TypeInformation] + = new TransformOperation[Scaler, T, T] { + override def transform( + instance: Scaler, + parameters: ParameterMap, + input: DataSet[T]): DataSet[T] = { + input.map{ + vector => + val breezeVector = copy(vector).asBreeze + instance.meanValue = instance.meanValue + breeze.stats.mean(breezeVector) + + breezeVector :/= instance.meanValue + + breezeVector.fromBreeze.asInstanceOf[T] + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala new file mode 100644 index 0000000..e49b3a3 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.experimental + +import scala.reflect.ClassTag + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.{ParameterMap, WithParameters} + +trait Transformer[Self <: Transformer[Self]] + extends Estimator[Self] + with WithParameters + with Serializable { + that: Self => + + def transform[I, O](input: DataSet[I], transformParameters: ParameterMap = ParameterMap.Empty) + (implicit transformOperation: TransformOperation[Self, I, O]): DataSet[O] = { + transformOperation.transform(that, transformParameters, input) + } + + def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T] = { + ChainedTransformer(this, transformer) + } + + def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P] = { + ChainedPredictor(this, predictor) + } +} + +object Transformer{ + implicit def fallbackChainedTransformOperation[ + L <: Transformer[L], + R <: Transformer[R], + LI, + LO, + RI, + RO] + (implicit transformLeft: TransformOperation[L, LI, LO], + transformRight: TransformOperation[R, RI, RO]) + : TransformOperation[ChainedTransformer[L,R], LI, RO] = { + + new TransformOperation[ChainedTransformer[L, R], LI, RO] { + override def transform( + chain: ChainedTransformer[L, R], + transformParameters: ParameterMap, + input: DataSet[LI]): DataSet[RO] = { + transformLeft.transform(chain.left, transformParameters, input) + transformRight.transform(chain.right, transformParameters, null) + } + } + } + + implicit def fallbackTransformOperation[ + Self: ClassTag, + IN: ClassTag, + OUT: ClassTag] + : TransformOperation[Self, IN, OUT] = { + new TransformOperation[Self, IN, OUT] { + override def transform( + instance: Self, + transformParameters: ParameterMap, + input: DataSet[IN]) + : DataSet[OUT] = { + val self = implicitly[ClassTag[Self]] + val in = implicitly[ClassTag[IN]] + val out = implicitly[ClassTag[OUT]] + + throw new RuntimeException("There is no TransformOperation defined for " + + self.runtimeClass + " which takes a DataSet[" + in.runtimeClass + + "] as input and transforms it into a DataSet[" + out.runtimeClass + "]") + } + } + } +} + +abstract class TransformOperation[Self, IN, OUT] extends Serializable{ + def transform(instance: Self, transformParameters: ParameterMap, input: DataSet[IN]): DataSet[OUT] +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala new file mode 100644 index 0000000..b73b249 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math + +trait CanCopy[T] extends Serializable { + def copy(value: T): T +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala index 851a283..fa34ae1 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala @@ -24,7 +24,10 @@ package org.apache.flink.ml.math * * @param data Array of doubles to store the vector elements */ -case class DenseVector(val data: Array[Double]) extends Vector with Serializable { +case class DenseVector( + val data: Array[Double]) + extends Vector + with Serializable { /** * Number of elements in a vector @@ -65,7 +68,7 @@ case class DenseVector(val data: Array[Double]) extends Vector with Serializable * * @return Copy of the vector instance */ - override def copy: Vector = { + override def copy: DenseVector = { DenseVector(data.clone()) } @@ -131,4 +134,8 @@ object DenseVector { def init(size: Int, value: Double): DenseVector = { new DenseVector(Array.fill(size)(value)) } + + implicit val canCopy = new CanCopy[DenseVector]{ + override def copy(value: DenseVector): DenseVector = value.copy + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala index 0762efb..ddfa084 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala @@ -49,7 +49,7 @@ case class SparseVector( * * @return Copy of the vector instance */ - override def copy: Vector = { + override def copy: SparseVector = { new SparseVector(size, indices.clone, data.clone) } http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala index 83e0c65..739fb9c 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala @@ -21,7 +21,7 @@ package org.apache.flink.ml.math /** Base trait for Vectors * */ -trait Vector { +trait Vector extends Serializable { /** Number of elements in a vector * @@ -72,3 +72,9 @@ trait Vector { } } } + +object Vector{ + implicit val canCopy = new CanCopy[Vector] { + override def copy(value: Vector): Vector = value.copy + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala index 4c7f254..e0f43d6 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala @@ -107,4 +107,6 @@ package object math { } } + + def copy[T](value: T)(implicit canCopy: CanCopy[T]): T = canCopy.copy(value) } http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala new file mode 100644 index 0000000..a185282 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.experimental + +import org.scalatest.FlatSpec + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.{SparseVector, DenseVector, Vector} +import org.apache.flink.test.util.FlinkTestBase + +class SciKitPipelineSuite extends FlatSpec with FlinkTestBase { + behavior of "Pipeline" + + it should "work" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val scaler = new Scaler + val offset = new Offset + + val input: DataSet[Vector] = env.fromCollection(List(DenseVector(2,1,3), SparseVector.fromCOO(3, (1,1), (2,2)))) + val training = env.fromCollection(List(LabeledVector(1.0, DenseVector(2,3,1)), LabeledVector(2.0, SparseVector.fromCOO(3, (1,1), (2,2))))) + val intData = env.fromCollection(List(1,2,3,4)) + + val result = scaler.transform(input) + + result.print() + + val result2 = offset.transform(input) + result2.print() + + val chain = scaler.chainTransformer(offset) + + val result3 = chain.transform(input)(ChainedTransformer.chainedTransformOperation(Scaler.vTransform, Offset.offsetTransform)) + + result3.print() + + val chain2 = chain.chainTransformer(scaler) + val result4 = chain2.transform(input) + + result4.print() + + val kmeans = new KMeans() + + val chainedPredictor = chain.chainPredictor(kmeans) + + val prediction = chainedPredictor.predict(result) + + prediction.print() + + env.execute() + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-streaming/flink-streaming-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml index 2ebd606..d039a8b 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml +++ b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml @@ -84,7 +84,7 @@ under the License. <build> <plugins> - <!-- get default data from flink-java-examples package --> + <!-- get default data from flink-java-examples pipeline --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java index d7fbc8e..188ac4a 100644 --- a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java +++ b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java @@ -19,7 +19,7 @@ /** * <strong>Table API</strong><br> * - * This package contains the generic part of the Table API. It can be used with Flink Streaming + * This pipeline contains the generic part of the Table API. It can be used with Flink Streaming * and Flink Batch. From Scala as well as from Java. * * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala index e74651b..37c5937 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala @@ -26,7 +26,7 @@ import scala.language.implicitConversions /** * == Table API (Scala) == * - * Importing this package with: + * Importing this pipeline with: * * {{{ * import org.apache.flink.api.scala.table._ http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala index c5c8c94..f50ca02 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.table /** - * This package contains the base class of AST nodes and all the expression language AST classes. + * This pipeline contains the base class of AST nodes and all the expression language AST classes. * Expression trees should not be manually constructed by users. They are implicitly constructed * from the implicit DSL conversions in * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala index bdcb22c..a31ec61 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala @@ -20,7 +20,7 @@ package org.apache.flink.api /** * == Table API == * - * This package contains the generic part of the Table API. It can be used with Flink Streaming + * This pipeline contains the generic part of the Table API. It can be used with Flink Streaming * and Flink Batch. From Scala as well as from Java. * * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala index a598483..adb9890 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.table /** - * The operations in this package are created by calling methods on [[Table]] they + * The operations in this pipeline are created by calling methods on [[Table]] they * should not be manually created by users of the API. */ package object plan http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala index a1bc4b7..155a17e 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala @@ -18,6 +18,6 @@ package org.apache.flink.api.table /** - * The functions in this package are used transforming Table API operations to Java API operations. + * The functions in this pipeline are used transforming Table API operations to Java API operations. */ package object runtime http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java index 3b2fb7f..fb80798 100644 --- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java +++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java @@ -150,7 +150,7 @@ public class TachyonFileSystemWrapperTest { } } - // package visible + // pipeline visible static final class DopOneTestEnvironment extends LocalEnvironment { static { initializeContextEnvironment(new ExecutionEnvironmentFactory() { http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java index d61f80e..5c30785 100644 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java +++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java @@ -86,7 +86,7 @@ public class TPCHQuery3 { } }); - // Join customers with orders and package them into a ShippingPriorityItem + // Join customers with orders and pipeline them into a ShippingPriorityItem DataSet<ShippingPriorityItem> customerWithOrders = customers.join(orders).where(0).equalTo(1) .with( http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-tests/src/test/assembly/test-custominput-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/assembly/test-custominput-assembly.xml b/flink-tests/src/test/assembly/test-custominput-assembly.xml index e6f3568..18adc47 100644 --- a/flink-tests/src/test/assembly/test-custominput-assembly.xml +++ b/flink-tests/src/test/assembly/test-custominput-assembly.xml @@ -28,7 +28,7 @@ under the License. <fileSet> <directory>${project.build.testOutputDirectory}</directory> <outputDirectory>/</outputDirectory> - <!--modify/add include to match your package(s) --> + <!--modify/add include to match your pipeline(s) --> <includes> <include>org/apache/flink/test/classloading/jar/CustomInputSplitProgram.class</include> <include>org/apache/flink/test/classloading/jar/CustomInputSplitProgram$*.class</include> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-tests/src/test/assembly/test-kmeans-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/assembly/test-kmeans-assembly.xml b/flink-tests/src/test/assembly/test-kmeans-assembly.xml index a8d34ab..3c547fb 100644 --- a/flink-tests/src/test/assembly/test-kmeans-assembly.xml +++ b/flink-tests/src/test/assembly/test-kmeans-assembly.xml @@ -28,7 +28,7 @@ under the License. <fileSet> <directory>${project.build.testOutputDirectory}</directory> <outputDirectory>/</outputDirectory> - <!--modify/add include to match your package(s) --> + <!--modify/add include to match your pipeline(s) --> <includes> <include>org/apache/flink/test/classloading/jar/KMeansForTest.class</include> <include>org/apache/flink/test/classloading/jar/KMeansForTest$*.class</include> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml b/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml index 8321b21..b311700 100644 --- a/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml +++ b/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml @@ -28,7 +28,7 @@ under the License. <fileSet> <directory>${project.build.testOutputDirectory}</directory> <outputDirectory>/</outputDirectory> - <!--modify/add include to match your package(s) --> + <!--modify/add include to match your pipeline(s) --> <includes> <include>org/apache/flink/test/classloading/jar/StreamingProgram.class</include> <include>org/apache/flink/test/classloading/jar/StreamingProgram$*.class</include> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java index 349275c..b3734a8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java @@ -92,7 +92,7 @@ public class TPCHQuery10ITCase extends RecordAPITestBase { + "32|2743|7744|4|4|6582.96|0.09|0.03|R|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n" + "32|85811|8320|5|44|79059.64|0.05|0.06|R|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n" + "32|11615|4117|6|6|9159.66|0.04|0.03|R|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n" - + "33|61336|8855|1|31|40217.23|0.09|0.04|R|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n" + + "33|61336|8855|1|31|40217.23|0.09|0.04|R|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n" + "33|60519|5532|2|32|47344.32|0.02|0.05|R|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n" + "33|137469|9983|3|5|7532.30|0.05|0.03|R|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n" + "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n" http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java index a0236c2..7dc76b5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java @@ -85,7 +85,7 @@ public class TPCHQuery3ITCase extends RecordAPITestBase { + "32|2743|7744|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n" + "32|85811|8320|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n" + "32|11615|4117|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n" - + "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n" + + "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n" + "33|60519|5532|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n" + "33|137469|9983|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n" + "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n" http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3WithUnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3WithUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3WithUnionITCase.java index 3ade964..d0503f5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3WithUnionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3WithUnionITCase.java @@ -81,7 +81,7 @@ public class TPCHQuery3WithUnionITCase extends RecordAPITestBase { + "32|2743|7744|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n" + "32|85811|8320|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n" + "32|11615|4117|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n" - + "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n" + + "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n" + "33|60519|5532|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n" + "33|137469|9983|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n" + "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n" http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery4ITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery4ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery4ITCase.java index 30c1b3e..9300bbd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery4ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery4ITCase.java @@ -79,7 +79,7 @@ public class TPCHQuery4ITCase extends RecordAPITestBase { + "32|2743|7744|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n" + "32|85811|8320|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n" + "32|11615|4117|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n" - + "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n" + + "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n" + "33|60519|5532|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n" + "33|137469|9983|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n" + "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n" http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery9ITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery9ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery9ITCase.java index f092400..28e3214 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery9ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery9ITCase.java @@ -239,7 +239,7 @@ public class TPCHQuery9ITCase extends RecordAPITestBase { + "32|275|776|4|4|4701.08|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n" + "32|19|11|5|44|65585.52|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n" + "32|1162|414|6|6|6378.96|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n" - + "33|6134|903|1|31|32244.03|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n" + + "33|6134|903|1|31|32244.03|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n" + "33|6052|565|2|32|30657.60|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n" + "33|13747|11|3|5|8303.70|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n" + "33|19|9|4|41|53110.99|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n" http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-yarn-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml index 2a63978..4be723d 100644 --- a/flink-yarn-tests/pom.xml +++ b/flink-yarn-tests/pom.xml @@ -28,7 +28,7 @@ under the License. </parent> <!-- - There is a separate "flink-yarn-tests" package that expects the "flink-dist" package + There is a separate "flink-yarn-tests" pipeline that expects the "flink-dist" pipeline to be build before. We need the YARN fat jar build by flink-dist for the tests. --> http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java index 7950792..f3d1263 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java @@ -65,7 +65,7 @@ import java.util.concurrent.ConcurrentMap; * This base class allows to use the MiniYARNCluster. * The cluster is re-used for all tests. * - * This class is located in a different package which is build after flink-dist. This way, + * This class is located in a different pipeline which is build after flink-dist. This way, * we can use the YARN uberjar of flink to start a Flink YARN session. * * The test is not thread-safe. Parallel execution of tests is not possible! http://git-wip-us.apache.org/repos/asf/flink/blob/fde0341f/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java index 502d72d..c439d09 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java @@ -64,7 +64,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; /** - * All classes in this package contain code taken from + * All classes in this pipeline contain code taken from * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc * and * https://github.com/hortonworks/simple-yarn-app