[streaming] [scala] Restructured streaming scala project and examples This closes #275
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8183c8c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8183c8c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8183c8c3 Branch: refs/heads/master Commit: 8183c8c3d0bf720d3fb8b407615fc033dcf169dd Parents: 3f1af0e Author: mbalassi <[email protected]> Authored: Sun Jan 4 12:44:16 2015 +0100 Committer: mbalassi <[email protected]> Committed: Sun Jan 4 20:54:00 2015 +0100 ---------------------------------------------------------------------- .../flink-streaming-examples/pom.xml | 137 ++++- .../examples/windowing/TopSpeedWindowing.scala | 98 ++++ .../scala/examples/windowing/WindowJoin.scala | 71 +++ .../flink-streaming-scala/pom.xml | 217 ++++++++ .../scala/ScalaStreamingAggregator.java | 111 ++++ .../api/scala/ConnectedDataStream.scala | 381 +++++++++++++ .../flink/streaming/api/scala/DataStream.scala | 558 +++++++++++++++++++ .../streaming/api/scala/SplitDataStream.scala | 50 ++ .../api/scala/StreamCrossOperator.scala | 111 ++++ .../api/scala/StreamExecutionEnvironment.scala | 275 +++++++++ .../api/scala/StreamJoinOperator.scala | 204 +++++++ .../api/scala/StreamingConversions.scala | 40 ++ .../api/scala/WindowedDataStream.scala | 209 +++++++ .../streaming/api/scala/windowing/Delta.scala | 47 ++ .../streaming/api/scala/windowing/Time.scala | 56 ++ flink-addons/flink-streaming/pom.xml | 1 + .../streaming/windowing/TopSpeedWindowing.scala | 96 ---- .../scala/streaming/windowing/WindowJoin.scala | 71 --- flink-scala/pom.xml | 6 - .../streaming/ScalaStreamingAggregator.java | 111 ---- .../scala/streaming/ConnectedDataStream.scala | 380 ------------- .../flink/api/scala/streaming/DataStream.scala | 558 ------------------- .../api/scala/streaming/SplitDataStream.scala | 50 -- .../scala/streaming/StreamCrossOperator.scala | 112 ---- .../streaming/StreamExecutionEnvironment.scala | 278 --------- .../scala/streaming/StreamJoinOperator.scala | 202 ------- .../scala/streaming/StreamingConversions.scala | 40 -- .../scala/streaming/WindowedDataStream.scala | 214 ------- .../api/scala/streaming/windowing/Delta.scala | 47 -- .../api/scala/streaming/windowing/Time.scala | 55 -- 30 files changed, 2565 insertions(+), 2221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml index 64be993..3661726 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml +++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml @@ -43,6 +43,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-java-examples</artifactId> <version>${project.version}</version> </dependency> @@ -56,7 +62,6 @@ under the License. <build> <plugins> - <!-- get default data from flink-java-examples package --> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -346,6 +351,136 @@ under the License. </execution> </executions> </plugin> + +<!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + <compilerPlugins> + <compilerPlugin> + <groupId>org.scalamacros</groupId> + <artifactId>paradise_${scala.version}</artifactId> + <version>${scala.macros.version}</version> + </compilerPlugin> + </compilerPlugins> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>0.5.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation> + <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + </plugins> <pluginManagement> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala new file mode 100644 index 0000000..dc01f02 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + + +import java.util.concurrent.TimeUnit._ + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; +import org.apache.flink.util.Collector +import scala.math.{max, min} + +import scala.util.Random + +import org.apache.flink.streaming.api.scala.windowing.Time +import org.apache.flink.streaming.api.scala.windowing.Delta + +/** + * An example of grouped stream windowing where different eviction and + * trigger policies can be used. A source fetches events from cars + * every 1 sec containing their id, their current speed (kmh), + * overall elapsed distance (m) and a timestamp. The streaming + * example triggers the top speed of each car every x meters elapsed + * for the last y seconds. + */ +object TopSpeedWindowing { + + case class CarSpeed(carId: Int, speed: Int, distance: Double, time: Long) + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val cars = env.addSource(carSource _).groupBy("carId") + .window(Time.of(evictionSec, SECONDS)) + .every(Delta.of[CarSpeed](triggerMeters, + (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0))) + .reduce((x, y) => if (x.speed > y.speed) x else y) + + cars print + + env.execute("TopSpeedWindowing") + + } + + def carSource(out: Collector[CarSpeed]) = { + + val speeds = new Array[Int](numOfCars) + val distances = new Array[Double](numOfCars) + + while (true) { + Thread sleep 1000 + for (i <- 0 until speeds.length) { + speeds(i) = if (Random.nextBoolean) min(100, speeds(i) + 5) else max(0, speeds(i) - 5) + distances(i) += speeds(i) / 3.6d + out.collect(new CarSpeed(i, speeds(i), distances(i), System.currentTimeMillis)) + } + } + } + + def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + if (args.length == 3) { + numOfCars = args(0).toInt + evictionSec = args(1).toInt + triggerMeters = args(2).toDouble + } + else { + System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>") + false + } + } + true + } + + var numOfCars = 2 + var evictionSec = 10 + var triggerMeters = 50d + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala new file mode 100644 index 0000000..a19e4b4 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.util.Collector +import scala.util.Random + +object WindowJoin { + + case class Name(id: Long, name: String) + case class Age(id: Long, age: Int) + case class Person(name: String, age: Long) + + def main(args: Array[String]) { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val names = env.addSource(nameStream _).map(x => Name(x._1, x._2)) + val ages = env.addSource(ageStream _).map(x => Age(x._1, x._2)) + + //Join the two input streams by id on the last second and create new Person objects + //containing both name and age + val joined = + names.join(ages).onWindow(1000) + .where("id").equalTo("id") { (n, a) => Person(n.name, a.age) } + + joined print + + env.execute("WindowJoin") + } + + //Stream source for generating (id, name) pairs + def nameStream(out: Collector[(Long, String)]) = { + val names = Array("tom", "jerry", "alice", "bob", "john", "grace") + + for (i <- 1 to 10000) { + if (i % 100 == 0) Thread.sleep(1000) else { + out.collect((i, names(Random.nextInt(names.length)))) + } + } + } + + //Stream source for generating (id, age) pairs + def ageStream(out: Collector[(Long, Int)]) = { + for (i <- 1 to 10000) { + if (i % 100 == 0) Thread.sleep(1000) else { + out.collect((i, Random.nextInt(90))) + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/pom.xml b/flink-addons/flink-streaming/flink-streaming-scala/pom.xml new file mode 100644 index 0000000..c06fba7 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/pom.xml @@ -0,0 +1,217 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-parent</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-streaming-scala</artifactId> + <name>flink-streaming-scala</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + </dependency> + + <dependency> + <groupId>org.scalamacros</groupId> + <artifactId>quasiquotes_${scala.binary.version}</artifactId> + <version>${scala.macros.version}</version> + </dependency> + + <dependency> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </dependency> + + <!-- guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + <scope>provided</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + <compilerPlugins> + <compilerPlugin> + <groupId>org.scalamacros</groupId> + <artifactId>paradise_${scala.version}</artifactId> + <version>${scala.macros.version}</version> + </compilerPlugin> + </compilerPlugins> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>0.5.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation> + <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java b/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java new file mode 100644 index 0000000..77d102d --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/java/org/apache/flink/api/streaming/scala/ScalaStreamingAggregator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.streaming.scala; + +import java.io.Serializable; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator; +import org.apache.flink.streaming.api.function.aggregation.SumFunction; + +import scala.Product; + +public class ScalaStreamingAggregator<IN extends Product> implements Serializable { + + private static final long serialVersionUID = 1L; + + TupleSerializerBase<IN> serializer; + Object[] fields; + int length; + int position; + + public ScalaStreamingAggregator(TypeSerializer<IN> serializer, int pos) { + this.serializer = (TupleSerializerBase<IN>) serializer; + this.length = this.serializer.getArity(); + this.fields = new Object[this.length]; + this.position = pos; + } + + public class Sum extends AggregationFunction<IN> { + private static final long serialVersionUID = 1L; + SumFunction sumFunction; + + public Sum(SumFunction func) { + super(ScalaStreamingAggregator.this.position); + this.sumFunction = func; + } + + @Override + public IN reduce(IN value1, IN value2) throws Exception { + for (int i = 0; i < length; i++) { + fields[i] = value2.productElement(i); + } + + fields[position] = sumFunction.add(fields[position], value1.productElement(position)); + + return serializer.createInstance(fields); + } + } + + public class ProductComparableAggregator extends ComparableAggregator<IN> { + + private static final long serialVersionUID = 1L; + + public ProductComparableAggregator(AggregationFunction.AggregationType aggregationType, + boolean first) { + super(ScalaStreamingAggregator.this.position, aggregationType, first); + } + + @SuppressWarnings("unchecked") + @Override + public IN reduce(IN value1, IN value2) throws Exception { + Object v1 = value1.productElement(position); + Object v2 = value2.productElement(position); + + int c = comparator.isExtremal((Comparable<Object>) v1, v2); + + if (byAggregate) { + if (c == 1) { + return value1; + } + if (first) { + if (c == 0) { + return value1; + } + } + + return value2; + } else { + for (int i = 0; i < length; i++) { + fields[i] = value2.productElement(i); + } + + if (c == 1) { + fields[position] = v1; + } + + return serializer.createInstance(fields); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala new file mode 100644 index 0000000..320bfa0 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import java.util + +import scala.collection.JavaConversions.asScalaBuffer +import scala.reflect.ClassTag + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream} +import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction } +import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable } +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.scala.StreamingConversions._ +import org.apache.flink.util.Collector + +class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { + + /** + * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps + * the output to a common type. The transformation calls a + * @param fun1 for each element of the first input and + * @param fun2 for each element of the second input. Each + * CoMapFunction call returns exactly one element. + * + * The CoMapFunction used to jointly transform the two input + * DataStreams + * @return The transformed { @link DataStream} + */ + def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): + DataStream[R] = { + if (fun1 == null || fun2 == null) { + throw new NullPointerException("Map function must not be null.") + } + val comapper = new CoMapFunction[IN1, IN2, R] { + def map1(in1: IN1): R = clean(fun1)(in1) + def map2(in2: IN2): R = clean(fun2)(in2) + } + + new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]], + new CoMapInvokable[IN1, IN2, R](comapper))) + } + + /** + * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps + * the output to a common type. The transformation calls a + * {@link CoMapFunction#map1} for each element of the first input and + * {@link CoMapFunction#map2} for each element of the second input. Each + * CoMapFunction call returns exactly one element. The user can also extend + * {@link RichCoMapFunction} to gain access to other features provided by + * the {@link RichFuntion} interface. + * + * @param coMapper + * The CoMapFunction used to jointly transform the two input + * DataStreams + * @return The transformed { @link DataStream} + */ + def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): + DataStream[R] = { + if (coMapper == null) { + throw new NullPointerException("Map function must not be null.") + } + + new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]], + new CoMapInvokable[IN1, IN2, R](coMapper))) + } + + /** + * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and + * maps the output to a common type. The transformation calls a + * {@link CoFlatMapFunction#flatMap1} for each element of the first input + * and {@link CoFlatMapFunction#flatMap2} for each element of the second + * input. Each CoFlatMapFunction call returns any number of elements + * including none. The user can also extend {@link RichFlatMapFunction} to + * gain access to other features provided by the {@link RichFuntion} + * interface. + * + * @param coFlatMapper + * The CoFlatMapFunction used to jointly transform the two input + * DataStreams + * @return The transformed { @link DataStream} + */ + def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): + DataStream[R] = { + if (coFlatMapper == null) { + throw new NullPointerException("FlatMap function must not be null.") + } + new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]], + new CoFlatMapInvokable[IN1, IN2, R](coFlatMapper))) + } + + /** + * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and + * maps the output to a common type. The transformation calls a + * @param fun1 for each element of the first input + * and @param fun2 for each element of the second + * input. Each CoFlatMapFunction call returns any number of elements + * including none. + * + * @return The transformed { @link DataStream} + */ + def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, + fun2: (IN2, Collector[R]) => Unit): DataStream[R] = { + if (fun1 == null || fun2 == null) { + throw new NullPointerException("FlatMap functions must not be null.") + } + val flatMapper = new CoFlatMapFunction[IN1, IN2, R] { + def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value, out) + def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value, out) + } + flatMap(flatMapper) + } + + /** + * GroupBy operation for connected data stream. Groups the elements of + * input1 and input2 according to keyPosition1 and keyPosition2. Used for + * applying function on grouped data streams for example + * {@link ConnectedDataStream#reduce} + * + * @param keyPosition1 + * The field used to compute the hashcode of the elements in the + * first input stream. + * @param keyPosition2 + * The field used to compute the hashcode of the elements in the + * second input stream. + * @return @return The transformed { @link ConnectedDataStream} + */ + def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = { + javaStream.groupBy(keyPosition1, keyPosition2) + } + + /** + * GroupBy operation for connected data stream. Groups the elements of + * input1 and input2 according to keyPositions1 and keyPositions2. Used for + * applying function on grouped data streams for example + * {@link ConnectedDataStream#reduce} + * + * @param keyPositions1 + * The fields used to group the first input stream. + * @param keyPositions2 + * The fields used to group the second input stream. + * @return @return The transformed { @link ConnectedDataStream} + */ + def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): + ConnectedDataStream[IN1, IN2] = { + javaStream.groupBy(keyPositions1, keyPositions2) + } + + /** + * GroupBy operation for connected data stream using key expressions. Groups + * the elements of input1 and input2 according to field1 and field2. A field + * expression is either the name of a public field or a getter method with + * parentheses of the {@link DataStream}S underlying type. A dot can be used + * to drill down into objects, as in {@code "field1.getInnerField2()" }. + * + * @param field1 + * The grouping expression for the first input + * @param field2 + * The grouping expression for the second input + * @return The grouped { @link ConnectedDataStream} + */ + def groupBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = { + javaStream.groupBy(field1, field2) + } + + /** + * GroupBy operation for connected data stream using key expressions. Groups + * the elements of input1 and input2 according to fields1 and fields2. A + * field expression is either the name of a public field or a getter method + * with parentheses of the {@link DataStream}S underlying type. A dot can be + * used to drill down into objects, as in {@code "field1.getInnerField2()" } + * . + * + * @param fields1 + * The grouping expressions for the first input + * @param fields2 + * The grouping expressions for the second input + * @return The grouped { @link ConnectedDataStream} + */ + def groupBy(fields1: Array[String], fields2: Array[String]): + ConnectedDataStream[IN1, IN2] = { + javaStream.groupBy(fields1, fields2) + } + + /** + * GroupBy operation for connected data stream. Groups the elements of + * input1 and input2 using fun1 and fun2. Used for applying + * function on grouped data streams for example + * {@link ConnectedDataStream#reduce} + * + * @param fun1 + * The function used for grouping the first input + * @param fun2 + * The function used for grouping the second input + * @return @return The transformed { @link ConnectedDataStream} + */ + def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _): + ConnectedDataStream[IN1, IN2] = { + + val keyExtractor1 = new KeySelector[IN1, Any] { + def getKey(in: IN1) = clean(fun1)(in) + } + val keyExtractor2 = new KeySelector[IN2, Any] { + def getKey(in: IN2) = clean(fun2)(in) + } + + javaStream.groupBy(keyExtractor1, keyExtractor2) + } + + /** + * Applies a reduce transformation on a {@link ConnectedDataStream} and maps + * the outputs to a common type. If the {@link ConnectedDataStream} is + * batched or windowed then the reduce transformation is applied on every + * sliding batch/window of the data stream. If the connected data stream is + * grouped then the reducer is applied on every group of elements sharing + * the same key. This type of reduce is much faster than reduceGroup since + * the reduce function can be applied incrementally. + * + * @param coReducer + * The { @link CoReduceFunction} that will be called for every + * element of the inputs. + * @return The transformed { @link DataStream}. + */ + def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1, IN2, R]): + DataStream[R] = { + if (coReducer == null) { + throw new NullPointerException("Reduce function must not be null.") + } + + new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]], + new CoReduceInvokable[IN1, IN2, R](coReducer))) + } + + /** + * Applies a reduce transformation on a {@link ConnectedDataStream} and maps + * the outputs to a common type. If the {@link ConnectedDataStream} is + * batched or windowed then the reduce transformation is applied on every + * sliding batch/window of the data stream. If the connected data stream is + * grouped then the reducer is applied on every group of elements sharing + * the same key. This type of reduce is much faster than reduceGroup since + * the reduce function can be applied incrementally. + * + * @param reducer1 + * @param reducer2 + * @param mapper1 + * @param mapper2 + * + * @return The transformed { @link DataStream}. + */ + def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1, + reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = { + if (mapper1 == null || mapper2 == null) { + throw new NullPointerException("Map functions must not be null.") + } + if (reducer1 == null || reducer2 == null) { + throw new NullPointerException("Reduce functions must not be null.") + } + + val reducer = new CoReduceFunction[IN1, IN2, R] { + def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1, value2) + def map2(value: IN2): R = clean(mapper2)(value) + def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1, value2) + def map1(value: IN1): R = clean(mapper1)(value) + } + reduce(reducer) + } + + /** + * Applies a CoWindow transformation on the connected DataStreams. The + * transformation calls the {@link CoWindowFunction#coWindow} method for for + * time aligned windows of the two data streams. System time is used as + * default to compute windows. + * + * @param coWindowFunction + * The { @link CoWindowFunction} that will be applied for the time + * windows. + * @param windowSize + * Size of the windows that will be aligned for both streams in + * milliseconds. + * @param slideInterval + * After every function call the windows will be slid by this + * interval. + * + * @return The transformed { @link DataStream}. + */ + def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: + CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = { + if (coWindowFunction == null) { + throw new NullPointerException("CoWindow function must no be null") + } + + javaStream.windowReduce(coWindowFunction, windowSize, slideInterval) + } + + /** + * Applies a CoWindow transformation on the connected DataStreams. The + * transformation calls the {@link CoWindowFunction#coWindow} method for for + * time aligned windows of the two data streams. System time is used as + * default to compute windows. + * + * @param coWindower + * The coWindowing function to be applied for the time windows. + * @param windowSize + * Size of the windows that will be aligned for both streams in + * milliseconds. + * @param slideInterval + * After every function call the windows will be slid by this + * interval. + * + * @return The transformed { @link DataStream}. + */ + def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], + Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = { + if (coWindower == null) { + throw new NullPointerException("CoWindow function must no be null") + } + + val coWindowFun = new CoWindowFunction[IN1, IN2, R] { + def coWindow(first: util.List[IN1], second: util.List[IN2], + out: Collector[R]): Unit = clean(coWindower)(first, second, out) + } + + javaStream.windowReduce(coWindowFun, windowSize, slideInterval) + } + + /** + * Returns the first {@link DataStream}. + * + * @return The first DataStream. + */ + def getFirst(): DataStream[IN1] = { + javaStream.getFirst + } + + /** + * Returns the second {@link DataStream}. + * + * @return The second DataStream. + */ + def getSecond(): DataStream[IN2] = { + javaStream.getSecond + } + + /** + * Gets the type of the first input + * + * @return The type of the first input + */ + def getInputType1(): TypeInformation[IN1] = { + javaStream.getInputType1 + } + + /** + * Gets the type of the second input + * + * @return The type of the second input + */ + def getInputType2(): TypeInformation[IN2] = { + javaStream.getInputType2 + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala new file mode 100644 index 0000000..270b80c --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, + SingleOutputStreamOperator, GroupedDataStream} +import org.apache.flink.api.common.typeinfo.TypeInformation +import scala.reflect.ClassTag +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.streaming.api.invokable.operator.MapInvokable +import org.apache.flink.util.Collector +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.streaming.api.invokable.StreamInvokable +import org.apache.flink.streaming.api.invokable.operator.{ GroupedReduceInvokable, StreamReduceInvokable } +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.streaming.api.function.sink.SinkFunction +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.windowing.helper.WindowingHelper +import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy } +import org.apache.flink.streaming.api.collector.OutputSelector +import scala.collection.JavaConversions._ +import java.util.HashMap +import org.apache.flink.streaming.api.function.aggregation.SumFunction +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase +import org.apache.flink.streaming.api.function.aggregation.AggregationFunction +import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.streaming.api.scala.StreamingConversions._ +import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator + +class DataStream[T](javaStream: JavaStream[T]) { + + /** + * Gets the underlying java DataStream object. + */ + def getJavaStream: JavaStream[T] = javaStream + + /** + * Sets the degree of parallelism of this operation. This must be greater than 1. + */ + def setParallelism(dop: Int): DataStream[T] = { + javaStream match { + case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop) + case _ => + throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot " + + "have " + + "parallelism.") + } + this + } + + /** + * Returns the degree of parallelism of this operation. + */ + def getParallelism: Int = javaStream match { + case op: SingleOutputStreamOperator[_, _] => op.getParallelism + case _ => + throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have" + + " " + + "parallelism.") + } + + /** + * Creates a new DataStream by merging DataStream outputs of + * the same type with each other. The DataStreams merged using this operator + * will be transformed simultaneously. + * + */ + def merge(dataStreams: DataStream[T]*): DataStream[T] = + javaStream.merge(dataStreams.map(_.getJavaStream): _*) + + /** + * Creates a new ConnectedDataStream by connecting + * DataStream outputs of different type with each other. The + * DataStreams connected using this operators can be used with CoFunctions. + * + */ + def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = + javaStream.connect(dataStream.getJavaStream) + + /** + * Groups the elements of a DataStream by the given key positions (for tuple/array types) to + * be used with grouped operators like grouped reduce or grouped aggregations + * + */ + def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*) + + /** + * Groups the elements of a DataStream by the given field expressions to + * be used with grouped operators like grouped reduce or grouped aggregations + * + */ + def groupBy(firstField: String, otherFields: String*): DataStream[T] = + javaStream.groupBy(firstField +: otherFields.toArray: _*) + + /** + * Groups the elements of a DataStream by the given K key to + * be used with grouped operators like grouped reduce or grouped aggregations + * + */ + def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = { + + val keyExtractor = new KeySelector[T, K] { + val cleanFun = clean(fun) + def getKey(in: T) = cleanFun(in) + } + javaStream.groupBy(keyExtractor) + } + + /** + * Sets the partitioning of the DataStream so that the output is + * partitioned by the selected fields. This setting only effects the how the outputs will be + * distributed between the parallel instances of the next processing operator. + * + */ + def partitionBy(fields: Int*): DataStream[T] = + javaStream.partitionBy(fields: _*) + + /** + * Sets the partitioning of the DataStream so that the output is + * partitioned by the selected fields. This setting only effects the how the outputs will be + * distributed between the parallel instances of the next processing operator. + * + */ + def partitionBy(firstField: String, otherFields: String*): DataStream[T] = + javaStream.partitionBy(firstField +: otherFields.toArray: _*) + + /** + * Sets the partitioning of the DataStream so that the output is + * partitioned by the given Key. This setting only effects the how the outputs will be + * distributed between the parallel instances of the next processing operator. + * + */ + def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = { + + val keyExtractor = new KeySelector[T, K] { + val cleanFun = clean(fun) + def getKey(in: T) = cleanFun(in) + } + javaStream.partitionBy(keyExtractor) + } + + /** + * Sets the partitioning of the DataStream so that the output tuples + * are broadcasted to every parallel instance of the next component. This + * setting only effects the how the outputs will be distributed between the + * parallel instances of the next processing operator. + * + */ + def broadcast: DataStream[T] = javaStream.broadcast() + + /** + * Sets the partitioning of the DataStream so that the output tuples + * are shuffled to the next component. This setting only effects the how the + * outputs will be distributed between the parallel instances of the next + * processing operator. + * + */ + def shuffle: DataStream[T] = javaStream.shuffle() + + /** + * Sets the partitioning of the DataStream so that the output tuples + * are forwarded to the local subtask of the next component (whenever + * possible). This is the default partitioner setting. This setting only + * effects the how the outputs will be distributed between the parallel + * instances of the next processing operator. + * + */ + def forward: DataStream[T] = javaStream.forward() + + /** + * Sets the partitioning of the DataStream so that the output tuples + * are distributed evenly to the next component.This setting only effects + * the how the outputs will be distributed between the parallel instances of + * the next processing operator. + * + */ + def distribute: DataStream[T] = javaStream.distribute() + + /** + * Initiates an iterative part of the program that creates a loop by feeding + * back data streams. To create a streaming iteration the user needs to define + * a transformation that creates two DataStreams.The first one one is the output + * that will be fed back to the start of the iteration and the second is the output + * stream of the iterative part. + * <p> + * stepfunction: initialStream => (feedback, output) + * <p> + * A common pattern is to use output splitting to create feedback and output DataStream. + * Please refer to the .split(...) method of the DataStream + * <p> + * By default a DataStream with iteration will never terminate, but the user + * can use the maxWaitTime parameter to set a max waiting time for the iteration head. + * If no data received in the set time the stream terminates. + * + * + */ + def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis: + Long = 0): DataStream[T] = { + val iterativeStream = javaStream.iterate(maxWaitTimeMillis) + + val (feedback, output) = stepFunction(new DataStream[T](iterativeStream)) + iterativeStream.closeWith(feedback.getJavaStream) + output + } + + /** + * Applies an aggregation that that gives the current maximum of the data stream at + * the given position. + * + */ + def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) + + /** + * Applies an aggregation that that gives the current minimum of the data stream at + * the given position. + * + */ + def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) + + /** + * Applies an aggregation that sums the data stream at the given position. + * + */ + def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) + + /** + * Applies an aggregation that that gives the current minimum element of the data stream by + * the given position. When equality, the user can set to get the first or last element with + * the minimal value. + * + */ + def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType + .MINBY, position, first) + + /** + * Applies an aggregation that that gives the current maximum element of the data stream by + * the given position. When equality, the user can set to get the first or last element with + * the maximal value. + * + */ + def maxBy(position: Int, first: Boolean = true): DataStream[T] = + aggregate(AggregationType.MAXBY, position, first) + + private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): + DataStream[T] = { + + val jStream = javaStream.asInstanceOf[JavaStream[Product]] + val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] + + val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) + + val reducer = aggregationType match { + case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position). + getTypeClass())); + case _ => new agg.ProductComparableAggregator(aggregationType, first) + } + + val invokable = jStream match { + case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer, + groupedStream.getKeySelector()) + case _ => new StreamReduceInvokable(reducer) + } + new DataStream[Product](jStream.transform("aggregation", jStream.getType(), + invokable)).asInstanceOf[DataStream[T]] + } + + /** + * Creates a new DataStream containing the current number (count) of + * received records. + * + */ + def count: DataStream[Long] = new DataStream[java.lang.Long]( + javaStream.count()).asInstanceOf[DataStream[Long]] + + /** + * Creates a new DataStream by applying the given function to every element of this DataStream. + */ + def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = { + if (fun == null) { + throw new NullPointerException("Map function must not be null.") + } + val mapper = new MapFunction[T, R] { + val cleanFun = clean(fun) + def map(in: T): R = cleanFun(in) + } + + javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)) + } + + /** + * Creates a new DataStream by applying the given function to every element of this DataStream. + */ + def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = { + if (mapper == null) { + throw new NullPointerException("Map function must not be null.") + } + + javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)) + } + + /** + * Creates a new DataStream by applying the given function to every element and flattening + * the results. + */ + def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R] = { + if (flatMapper == null) { + throw new NullPointerException("FlatMap function must not be null.") + } + javaStream.transform("flatMap", implicitly[TypeInformation[R]], + new FlatMapInvokable[T, R](flatMapper)) + } + + /** + * Creates a new DataStream by applying the given function to every element and flattening + * the results. + */ + def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R] = { + if (fun == null) { + throw new NullPointerException("FlatMap function must not be null.") + } + val flatMapper = new FlatMapFunction[T, R] { + val cleanFun = clean(fun) + def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) } + } + flatMap(flatMapper) + } + + /** + * Creates a new DataStream by applying the given function to every element and flattening + * the results. + */ + def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] = { + if (fun == null) { + throw new NullPointerException("FlatMap function must not be null.") + } + val flatMapper = new FlatMapFunction[T, R] { + val cleanFun = clean(fun) + def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect } + } + flatMap(flatMapper) + } + + /** + * Creates a new [[DataStream]] by reducing the elements of this DataStream + * using an associative reduce function. + */ + def reduce(reducer: ReduceFunction[T]): DataStream[T] = { + if (reducer == null) { + throw new NullPointerException("Reduce function must not be null.") + } + javaStream match { + case ds: GroupedDataStream[_] => javaStream.transform("reduce", + javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())) + case _ => javaStream.transform("reduce", javaStream.getType(), + new StreamReduceInvokable[T](reducer)) + } + } + + /** + * Creates a new [[DataStream]] by reducing the elements of this DataStream + * using an associative reduce function. + */ + def reduce(fun: (T, T) => T): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Reduce function must not be null.") + } + val reducer = new ReduceFunction[T] { + val cleanFun = clean(fun) + def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } + } + reduce(reducer) + } + + /** + * Creates a new DataStream that contains only the elements satisfying the given filter predicate. + */ + def filter(filter: FilterFunction[T]): DataStream[T] = { + if (filter == null) { + throw new NullPointerException("Filter function must not be null.") + } + javaStream.filter(filter) + } + + /** + * Creates a new DataStream that contains only the elements satisfying the given filter predicate. + */ + def filter(fun: T => Boolean): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Filter function must not be null.") + } + val filter = new FilterFunction[T] { + val cleanFun = clean(fun) + def filter(in: T) = cleanFun(in) + } + this.filter(filter) + } + + /** + * Create a WindowedDataStream that can be used to apply + * transformation like .reduce(...) or aggregations on + * preset chunks(windows) of the data stream. To define the windows one or + * more WindowingHelper-s such as Time, Count and + * Delta can be used.</br></br> When applied to a grouped data + * stream, the windows (evictions) and slide sizes (triggers) will be + * computed on a per group basis. </br></br> For more advanced control over + * the trigger and eviction policies please use to + * window(List(triggers), List(evicters)) + */ + def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = + javaStream.window(windowingHelper: _*) + + /** + * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s. + * Windowing can be used to apply transformation like .reduce(...) or aggregations on + * preset chunks(windows) of the data stream.</br></br>For most common + * use-cases please refer to window(WindowingHelper[_]*) + * + */ + def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): + WindowedDataStream[T] = javaStream.window(triggers, evicters) + + /** + * + * Operator used for directing tuples to specific named outputs using an + * OutputSelector. Calling this method on an operator creates a new + * SplitDataStream. + */ + def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream match { + case op: SingleOutputStreamOperator[_, _] => op.split(selector) + case _ => + throw new UnsupportedOperationException("Operator " + javaStream.toString + " can not be " + + "split.") + } + + /** + * Creates a new SplitDataStream that contains only the elements satisfying the + * given output selector predicate. + */ + def split(fun: T => String): SplitDataStream[T] = { + if (fun == null) { + throw new NullPointerException("OutputSelector must not be null.") + } + val selector = new OutputSelector[T] { + val cleanFun = clean(fun) + def select(in: T): java.lang.Iterable[String] = { + List(cleanFun(in)) + } + } + split(selector) + } + + /** + * Initiates a temporal Join transformation that joins the elements of two + * data streams on key equality over a specified time window. + * + * This method returns a StreamJoinOperator on which the + * .onWindow(..) should be called to define the + * window, and then the .where(..) and .equalTo(..) methods can be used to defin + * the join keys.</p> The user can also use the apply method of the returned JoinedStream + * to use custom join function. + * + */ + def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = + new StreamJoinOperator[T, R](javaStream, stream.getJavaStream) + + /** + * Initiates a temporal cross transformation that builds all pair + * combinations of elements of both DataStreams, i.e., it builds a Cartesian + * product. + * + * This method returns a StreamJoinOperator on which the + * .onWindow(..) should be called to define the + * window, and then the .where(..) and .equalTo(..) methods can be used to defin + * the join keys.</p> The user can also use the apply method of the returned JoinedStream + * to use custom join function. + * + */ + def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] = + new StreamCrossOperator[T, R](javaStream, stream.getJavaStream) + + /** + * Writes a DataStream to the standard output stream (stdout). For each + * element of the DataStream the result of .toString is + * written. + * + */ + def print(): DataStream[T] = javaStream.print() + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of .toString + * is written. + * + */ + def writeAsText(path: String, millis: Long = 0): DataStream[T] = + javaStream.writeAsText(path, millis) + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of .toString + * is written. + * + */ + def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = + javaStream.writeAsCsv(path, millis) + + /** + * Adds the given sink to this DataStream. Only streams with sinks added + * will be executed once the StreamExecutionEnvironment.execute(...) + * method is called. + * + */ + def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = + javaStream.addSink(sinkFuntion) + + /** + * Adds the given sink to this DataStream. Only streams with sinks added + * will be executed once the StreamExecutionEnvironment.execute(...) + * method is called. + * + */ + def addSink(fun: T => Unit): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Sink function must not be null.") + } + val sinkFunction = new SinkFunction[T] { + val cleanFun = clean(fun) + def invoke(in: T) = cleanFun(in) + } + this.addSink(sinkFunction) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala new file mode 100644 index 0000000..7349db6 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } +import org.apache.flink.streaming.api.scala.StreamingConversions._ + +/** + * The SplitDataStream represents an operator that has been split using an + * {@link OutputSelector}. Named outputs can be selected using the + * {@link #select} function. + * + * @param <OUT> + * The type of the output. + */ +class SplitDataStream[T](javaStream: SplitJavaStream[T]) { + + /** + * Gets the underlying java DataStream object. + */ + private[flink] def getJavaStream: SplitJavaStream[T] = javaStream + + /** + * Sets the output names for which the next operator will receive values. + */ + def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*) + + /** + * Selects all output names from a split data stream. + */ + def selectAll(): DataStream[T] = javaStream.selectAll() + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala new file mode 100644 index 0000000..e26db62 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import scala.reflect.ClassTag + +import org.apache.commons.lang.Validate +import org.apache.flink.api.common.functions.CrossFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.typeutils.CaseClassSerializer +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} +import org.apache.flink.streaming.api.datastream.TemporalOperator +import org.apache.flink.streaming.api.function.co.CrossWindowFunction +import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.scala.StreamingConversions._ + +class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends + TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) { + + override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = { + + val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this, + (l: I1, r: I2) => (l, r)) + + val returnType = new CaseClassTypeInfo[(I1, I2)]( + + classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", "_2")) { + + override def createSerializer: TypeSerializer[(I1, I2)] = { + val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = types(i).createSerializer + } + + new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { + override def createInstance(fields: Array[AnyRef]) = { + (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2]) + } + } + } + } + + val javaStream = input1.connect(input2).addGeneralWindowCombine( + crossWindowFunction, + returnType, windowSize, + slideInterval, timeStamp1, timeStamp2); + + new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream) + } +} +object StreamCrossOperator { + + private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], + javaStream: JavaStream[(I1, I2)]) extends + DataStream[(I1, I2)](javaStream) { + + /** + * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf + * call will be emitted. + * + */ + def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { + + val invokable = new CoWindowInvokable[I1, I2, R]( + clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1, + op.timeStamp2) + + javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), + invokable) + + javaStream.setType(implicitly[TypeInformation[R]]) + } + } + + private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2], + crossFunction: (I1, I2) => R): + CrossWindowFunction[I1, I2, R] = { + Validate.notNull(crossFunction, "Join function must not be null.") + + val crossFun = new CrossFunction[I1, I2, R] { + val cleanFun = op.input1.clean(crossFunction) + + override def cross(first: I1, second: I2): R = { + cleanFun(first, second) + } + } + + new CrossWindowFunction[I1, I2, R](crossFun) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala new file mode 100644 index 0000000..361abd6 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import scala.reflect.ClassTag + +import org.apache.commons.lang.Validate +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} +import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction } +import org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream +import org.apache.flink.util.Collector + +class StreamExecutionEnvironment(javaEnv: JavaEnv) { + + /** + * Sets the degree of parallelism (DOP) for operations executed through this environment. + * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with + * x parallel instances. This value can be overridden by specific operations using + * [[DataStream.setParallelism]]. + */ + def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = { + javaEnv.setDegreeOfParallelism(degreeOfParallelism) + } + + /** + * Returns the default degree of parallelism for this execution environment. Note that this + * value can be overridden by individual operations using [[DataStream.setParallelism]] + */ + def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism + + /** + * Sets the maximum time frequency (milliseconds) for the flushing of the + * output buffers. By default the output buffers flush frequently to provide + * low latency and to aid smooth developer experience. Setting the parameter + * can result in three logical modes: + * + * <ul> + * <li> + * A positive integer triggers flushing periodically by that integer</li> + * <li> + * 0 triggers flushing after every record thus minimizing latency</li> + * <li> + * -1 triggers flushing only when the output buffer is full thus maximizing + * throughput</li> + * </ul> + * + */ + def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = { + javaEnv.setBufferTimeout(timeoutMillis) + this + } + + /** + * Gets the default buffer timeout set for this environment + */ + def getBufferTimout: Long = javaEnv.getBufferTimeout() + + /** + * Creates a DataStream that represents the Strings produced by reading the + * given file line wise. The file will be read with the system's default + * character set. + * + */ + def readTextFile(filePath: String): DataStream[String] = + javaEnv.readTextFile(filePath) + + /** + * Creates a DataStream that represents the Strings produced by reading the + * given file line wise multiple times(infinite). The file will be read with + * the system's default character set. This functionality can be used for + * testing a topology. + * + */ + def readTextStream(StreamPath: String): DataStream[String] = + javaEnv.readTextStream(StreamPath) + + /** + * Creates a new DataStream that contains the strings received infinitely + * from socket. Received strings are decoded by the system's default + * character set. + * + */ + def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] = + javaEnv.socketTextStream(hostname, port, delimiter) + + /** + * Creates a new DataStream that contains the strings received infinitely + * from socket. Received strings are decoded by the system's default + * character set, uses '\n' as delimiter. + * + */ + def socketTextStream(hostname: String, port: Int): DataStream[String] = + javaEnv.socketTextStream(hostname, port) + + /** + * Creates a new DataStream that contains a sequence of numbers. + * + */ + def generateSequence(from: Long, to: Long): DataStream[Long] = { + new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)). + asInstanceOf[DataStream[Long]] + } + + /** + * Creates a DataStream that contains the given elements. The elements must all be of the + * same type and must be serializable. + * + * * Note that this operation will result in a non-parallel data source, i.e. a data source with + * a degree of parallelism of one. + */ + def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = { + val typeInfo = implicitly[TypeInformation[T]] + fromCollection(data)(implicitly[ClassTag[T]], typeInfo) + } + + /** + * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable + * because the framework may move the elements into the cluster if needed. + * + * Note that this operation will result in a non-parallel data source, i.e. a data source with + * a degree of parallelism of one. + */ + def fromCollection[T: ClassTag: TypeInformation]( + data: Seq[T]): DataStream[T] = { + Validate.notNull(data, "Data must not be null.") + val typeInfo = implicitly[TypeInformation[T]] + + val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions + .asJavaCollection(data)) + + javaEnv.addSource(sourceFunction, typeInfo) + } + + /** + * Create a DataStream using a user defined source function for arbitrary + * source functionality. + * + */ + def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = { + Validate.notNull(function, "Function must not be null.") + val cleanFun = StreamExecutionEnvironment.clean(function) + val typeInfo = implicitly[TypeInformation[T]] + javaEnv.addSource(cleanFun, typeInfo) + } + + /** + * Create a DataStream using a user defined source function for arbitrary + * source functionality. + * + */ + def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = { + Validate.notNull(function, "Function must not be null.") + val sourceFunction = new SourceFunction[T] { + val cleanFun = StreamExecutionEnvironment.clean(function) + override def invoke(out: Collector[T]) { + cleanFun(out) + } + } + addSource(sourceFunction) + } + + /** + * Triggers the program execution. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * <p> + * The program execution will be logged and displayed with a generated + * default name. + * + */ + def execute() = javaEnv.execute() + + /** + * Triggers the program execution. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * <p> + * The program execution will be logged and displayed with the provided name + * + */ + def execute(jobName: String) = javaEnv.execute(jobName) + +} + +object StreamExecutionEnvironment { + + private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) + f + } + + /** + * Creates an execution environment that represents the context in which the program is + * currently executed. If the program is invoked standalone, this method returns a local + * execution environment. If the program is invoked from within the command line client + * to be submitted to a cluster, this method returns the execution environment of this cluster. + */ + def getExecutionEnvironment: StreamExecutionEnvironment = { + new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment) + } + + /** + * Creates a local execution environment. The local execution environment will run the program in + * a multi-threaded fashion in the same JVM as the environment was created in. The default degree + * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads). + */ + def createLocalEnvironment( + degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): + StreamExecutionEnvironment = { + new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism)) + } + + /** + * Creates a remote execution environment. The remote environment sends (parts of) the program to + * a cluster for execution. Note that all file paths used in the program must be accessible from + * the cluster. The execution will use the cluster's default degree of parallelism, unless the + * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]]. + * + * @param host The host name or address of the master (JobManager), + * where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. + * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the + * program uses + * user-defined functions, user-defined input formats, or any libraries, + * those must be + * provided in the JAR files. + */ + def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): + StreamExecutionEnvironment = { + new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)) + } + + /** + * Creates a remote execution environment. The remote environment sends (parts of) the program + * to a cluster for execution. Note that all file paths used in the program must be accessible + * from the cluster. The execution will use the specified degree of parallelism. + * + * @param host The host name or address of the master (JobManager), + * where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. + * @param degreeOfParallelism The degree of parallelism to use during the execution. + * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the + * program uses + * user-defined functions, user-defined input formats, or any libraries, + * those must be + * provided in the JAR files. + */ + def createRemoteEnvironment( + host: String, + port: Int, + degreeOfParallelism: Int, + jarFiles: String*): StreamExecutionEnvironment = { + val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*) + javaEnv.setDegreeOfParallelism(degreeOfParallelism) + new StreamExecutionEnvironment(javaEnv) + } +}
