[streaming] Updated streaming guide for recent connector and data source changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8500ad08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8500ad08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8500ad08

Branch: refs/heads/master
Commit: 8500ad08dd764551673d992db9ec7e3f61095a53
Parents: 11518b5
Author: Gyula Fora <[email protected]>
Authored: Mon Jan 5 18:26:18 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Tue Jan 6 15:09:04 2015 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 309 +++----------------
 .../api/scala/StreamExecutionEnvironment.scala  |   6 +-
 2 files changed, 56 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8500ad08/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index bea0907..c7e7060 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -170,7 +170,9 @@ Usage: `operator.setParallelism(1)`
 
 ### Sources
 
-The user can connect to data streams by the different implementations of 
`SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. 
In contrast with other operators, DataStreamSources have a default operator 
parallelism of 1 which can be increased using the `.setParallelism(dop)` method 
as we will see later at the operator settings.
+The user can connect to data streams by the different implementations of 
`SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. 
In contrast with other operators, DataStreamSources have a default operator 
parallelism of 1.
+
+To create parallel sources the users source function needs to implement 
`ParallelSourceFunction` or extend `RichParallelSourceFunction` in which cases 
the source will have the parallelism of the environment. The degree of 
parallelism for ParallelSourceFunctions can be changed afterwards using 
`source.setParallelism(int dop)`.
 
 There are several predefined ones similar to the ones of the batch API and 
some streaming specific ones like:
 
@@ -617,7 +619,7 @@ Stream connectors
 
 Connectors provide an interface for accessing data from various third party 
sources (message queues). Currently four connectors are natively supported, 
namely [Apache Kafka](https://kafka.apache.org/),  
[RabbitMQ](http://www.rabbitmq.com/), [Apache 
Flume](https://flume.apache.org/index.html) and [Twitter Streaming 
API](https://dev.twitter.com/docs/streaming-apis).
 
-Typically the connector packages consist of an abstract source and sink (with 
the exception of Twitter where only a source is provided). The burden of the 
user is to implement a subclass of these abstract classes specifying a 
serializer and a deserializer function. 
+Typically the connector packages consist of a source and sink class (with the 
exception of Twitter where only a source is provided). To use these sources the 
user needs to pass Serialization/Deserialization schemas for the connectors for 
the desired types. (Or use some predefined ones)
 
 To run an application using one of these connectors usually additional third 
party components are required to be installed and launched, e.g. the servers 
for the message queues. Further instructions for these can be found in the 
corresponding subsections. [Docker 
containers](#docker-containers-for-connectors) are also provided encapsulating 
these services to aid users getting started with connectors.
 
@@ -631,106 +633,40 @@ This connector provides access to data streams from 
[Apache Kafka](https://kafka
 * If the Kafka zookeeper and server are running on a remote machine then in 
the config/server.properties file the advertised.host.name must be set to the 
machine's IP address.
 
 #### Kafka Source
-An abstract class providing an interface for receiving data from Kafka. By 
implementing the user must:
+A class providing an interface for receiving data from Kafka.
 
- * Write a constructor calling the constructor of the abstract class,
- * Write a deserializer function which processes the data coming from Kafka,
- * Stop the source manually when necessary with one of the close functions.
+The followings have to be provided for the `KafkaSource(..)` constructor in 
order:
 
-The implemented class must extend `KafkaSource`, for example: 
`KafkaSource<String>`.
+1. The hostname
+2. The group name
+3. The topic name
+4. The parallelism
+5. Deserialisation schema
 
-##### Constructor
-An example of an implementation of a constructor:
 
-~~~java
-public MyKafkaSource(String zkQuorum, String groupId, String topicId, int 
numThreads) {
-    super(zkQuorum, groupId, topicId, numThreads);
-}
-~~~
-
-##### Deserializer
-An example of an implementation of a deserializer:
+Example:
 
 ~~~java
-@Override
-public String deserialize(byte[] msg) {
-    String s = new String(msg);
-    if(s.equals("q")){
-        closeWithoutSend();
-    }
-    return new String(s);
-}
+DataStream<String> stream = env
+       .addSource(new KafkaSource<String>("localhost:2181", "group", 
"test",new SimpleStringSchema()))
+       .print();
 ~~~
 
-The source closes when it receives the String `"q"`.
-
-###### Close<a name="kafka_source_close"></a>
-Two types of close functions are available, namely `closeWithoutSend()` and 
`sendAndClose()`. The former closes the connection immediately and no further 
data will be sent, while the latter closes the connection only when the next 
message is sent after this call.
-
-In the example provided `closeWithoutSend()` is used because here the String 
`"q"` is meta-message indicating the end of the stream and there is no need to 
forward it. 
-
 #### Kafka Sink
-An abstract class providing an interface for sending data to Kafka. By 
implementing the user must:
-
- * Write a constructor calling the constructor of the abstract class,
- * Write a serializer function to send data in the desired form to Kafka,
- * Stop the sink manually when necessary with one of the close functions.
-
-The implemented class must extend `KafkaSink`, for example `KafkaSink<String, 
String>`.
-
-##### Constructor
-An example of an implementation of a constructor:
-
-~~~java
-public MyKafkaSink(String topicId, String brokerAddr) {
-    super(topicId, brokerAddr);
-}
-~~~
-
-##### Serializer
-An example of an implementation of a serializer:
-
-~~~java
-@Override
-public String serialize(String tuple) {
-    if(tuple.equals("q")){
-        sendAndClose();
-    }
-    return tuple;
-}
-~~~
+A class providing an interface for sending data to Kafka. 
 
-##### Close
-The API provided is the [same](#kafka_source_close) as the one for 
`KafkaSource`.
+The followings have to be provided for the `KafkaSink()` constructor in order:
 
-#### Building A Topology
-To use a Kafka connector as a source in Flink call the `addSource()` function 
with a new instance of the class which extends `KafkaSource` as parameter:
-
-~~~java
-DataStream<String> stream1 = env.
-    addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), 
SOURCE_PARALELISM)
-    .print();
-~~~
-
-The followings have to be provided for the `MyKafkaSource()` constructor in 
order:
-
-1. The hostname
-2. The group name
-3. The topic name
-4. The parallelism
+1. The topic name
+2. The hostname
+3. Serialisation schema
 
-Similarly to use a Kafka connector as a sink in Flink call the `addSink()` 
function with a new instance of the class which extends `KafkaSink`:
+Example: 
 
 ~~~java
-DataStream<String> stream2 = env
-    .addSource(new MySource())
-    .addSink(new MyKafkaSink("test", "localhost:9092"));
+stream.addSink(new KafkaSink<String, String>("test", "localhost:9092", new 
SimpleStringSchema()));
 ~~~
 
-The followings have to be provided for the `MyKafkaSink()` constructor in 
order:
-
-1. The topic name
-2. The hostname
 
 More about Kafka can be found 
[here](https://kafka.apache.org/documentation.html).
 
@@ -741,114 +677,40 @@ More about Kafka can be found 
[here](https://kafka.apache.org/documentation.html
 This connector provides access to datastreams from [Apache 
Flume](http://flume.apache.org/).
 
 #### Installing Apache Flume
-[Download](http://flume.apache.org/download.html) Apache Flume. A 
configuration file is required for starting agents in Flume. A configuration 
file for running the example can be found [here](#config_file). 
+[Download](http://flume.apache.org/download.html) Apache Flume. A 
configuration file is required for starting agents in Flume. A configuration 
file for running the example can be found [here](#config_file).
 
 #### Flume Source
-An abstract class providing an interface for receiving data from Flume. By 
implementing the user must:
+A class providing an interface for receiving data from Flume.
 
- * Write a constructor calling the constructor of the abstract class,
- * Write a deserializer function which processes the data coming from Flume,
- * Stop the source manually when necessary with one of the close functions.
+The followings have to be provided for the `FlumeSource(…)` constructor in 
order:
 
-The implemented class must extend `FlumeSource` for example: 
`FlumeSource<String>`
-
-##### Constructor
-An example of an implementation of a constructor:
-
-~~~java
-MyFlumeSource(String host, int port) {
-    super(host, port);
-}
-~~~
+1. The hostname
+2. The port number
+3. Deserialisation schema
 
-##### Deserializer
-An example of an implementation of a deserializer:
+Example:
 
 ~~~java
-@Override
-public String deserialize(byte[] msg) {
-    String s = (String) SerializationUtils.deserialize(msg);
-    String out = s;
-    if (s.equals("q")) {
-        closeWithoutSend();
-    }
-    return out;
-}
+DataStream<String> stream = env
+       .addSource(new FlumeSource<String>("localhost", 41414, new 
SimpleStringSchema()))
+       .print();
 ~~~
 
-The source closes when it receives the String `"q"`.
-
-##### Close<a name="flume_source_close"></a>
-Two types of close functions are available, namely `closeWithoutSend()` and 
`sendAndClose()`.The former closes the connection immediately and no further 
data will be sent, while the latter closes the connection only when the next 
message is sent after this call.
-
-In the example `closeWithoutSend()` is used because here the String `"q"` is 
meta-message indicating the end of the stream and there is no need to forward 
it. 
-
 #### Flume Sink
-An abstract class providing an interface for sending data to Flume. By 
implementing the user must:
-
-* Write a constructor calling the constructor of the abstract class,
-* Write a serializer function to send data in the desired form to Flume,
-* Stop the sink manually when necessary with one of the close functions.
+A class providing an interface for sending data to Flume. 
 
-The implemented class must extend `FlumeSink`, for example `FlumeSink<String, 
String>`.
-
-##### Constructor
-An example of an implementation of a constructor:
-
-~~~java
-public MyFlumeSink(String host, int port) {
-    super(host, port);
-}
-~~~
-
-##### Serializer
-An example of an implementation of a serializer.
-
-~~~java
-@Override
-public byte[] serialize(String tuple) {
-    if (tuple.equals("q")) {
-        try {
-            sendAndClose();
-        } catch (Exception e) {
-            new RuntimeException("Error while closing Flume connection with " 
+ port + " at "
-                + host, e);
-        }
-    }
-    return SerializationUtils.serialize(tuple);
-}
-~~~
-
-##### Close
-The API provided is the [same](#flume_source_close) as the one for 
`FlumeSource`.
-
-#### Building A Topology
-To use a Flume connector as a source in Flink call the `addSource()` function 
with a new instance of the class which extends `FlumeSource` as parameter:
-
-~~~java
-DataStream<String> dataStream1 = env
-    .addSource(new MyFlumeSource("localhost", 41414))
-    .print();
-~~~
-
-The followings have to be provided for the `MyFlumeSource()` constructor in 
order:
+The followings have to be provided for the `FlumeSink(…)` constructor in 
order:
 
 1. The hostname
 2. The port number
+3. Serialisation schema
 
-Similarly to use a Flume connector as a sink in Flink call the `addSink()` 
function with a new instance of the class which extends `FlumeSink`
+Example: 
 
 ~~~java
-DataStream<String> dataStream2 = env
-    .fromElements("one", "two", "three", "four", "five", "q")
-    .addSink(new MyFlumeSink("localhost", 42424));
+stream.addSink(new FlumeSink<String>("localhost", 42424, new 
StringToByteSerializer()));
 ~~~
 
-The followings have to be provided for the `MyFlumeSink()` constructor in 
order:
-
-1. The hostname
-2. The port number
-
 ##### Configuration file<a name="config_file"></a>
 An example of a configuration file:
 
@@ -890,107 +752,38 @@ This connector provides access to datastreams from 
[RabbitMQ](http://www.rabbitm
 Follow the instructions from the [RabbitMQ download 
page](http://www.rabbitmq.com/download.html). After the installation the server 
automatically starts and the application connecting to RabbitMQ can be launched.
 
 #### RabbitMQ Source
-An abstract class providing an interface for receiving data from RabbitMQ. By 
implementing the user must:
-
-* Write a constructor calling the constructor of the abstract class,
-* Write a deserializer function which processes the data coming from RabbitMQ,
-* Stop the source manually when necessary with one of the close functions.
 
-The implemented class must extend `RabbitMQSource` for example: 
`RabbitMQSource<String>`
+A class providing an interface for receiving data from RabbitMQ.
 
-##### Constructor
-An example of an implementation of a constructor:
+The followings have to be provided for the `RMQSource(…)` constructor in 
order:
 
-~~~java
-public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
-    super(HOST_NAME, QUEUE_NAME);
-}
-~~~
+1. The hostname
+2. The queue name
+3. Deserialisation schema
 
-##### Deserializer
-An example of an implemetation of a deserializer:
+Example:
 
 ~~~java
-@Override
-public String deserialize(byte[] t) {
-    String s = (String) SerializationUtils.deserialize(t);
-    String out = s;
-    if (s.equals("q")) {
-        closeWithoutSend();
-    }
-    return out;
-}
+DataStream<String> stream = env
+       .addSource(new RMQSource<String>("localhost", "hello", new 
SimpleStringSchema()))
+       .print();
 ~~~
 
-The source closes when it receives the String `"q"`.
-
-##### Close<a name="rmq_source_close"></a>
-Two types of close functions are available, namely `closeWithoutSend()` and 
`sendAndClose()`. The former closes the connection immediately and no further 
data will be sent, while the latter closes the connection only when the next 
message is sent after this call.
-
-Closes the connection only when the next message is sent after this call.
-
-In the example `closeWithoutSend()` is used because here the String `"q"` is 
meta-message indicating the end of the stream and there is no need to forward 
it. 
-
 #### RabbitMQ Sink
-An abstract class providing an interface for sending data to RabbitMQ. By 
implementing the user must:
-
-* Write a constructor calling the constructor of the abstract class
-* Write a serializer function to send data in the desired form to RabbitMQ
-* Stop the sink manually when necessary with one of the close functions
-
-The implemented class must extend `RabbitMQSink` for example: 
`RabbitMQSink<String, String>`
-
-##### Constructor
-An example of an implementation of a constructor:
-
-~~~java
-public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
-    super(HOST_NAME, QUEUE_NAME);
-}
-~~~
+A class providing an interface for sending data to RabbitMQ. 
 
-##### Serializer
-An example of an implementation of a serializer.
-
-~~~java
-@Override
-public byte[] serialize(Tuple tuple) {
-    if (t.getField(0).equals("q")) {
-        sendAndClose();
-    }
-    return SerializationUtils.serialize(tuple.f0);
-}
-~~~
-
-##### Close
-The API provided is the [same](#rmq_source_close) as the one for 
`RabbitMQSource`.
-
-#### Building A Topology
-To use a RabbitMQ connector as a source in Flink call the `addSource()` 
function with a new instance of the class which extends `RabbitMQSource` as 
parameter:
-
-~~~java
-DataStream<String> dataStream1 = env
-    .addSource(new MyRMQSource("localhost", "hello"))
-    .print();
-~~~
-
-The followings have to be provided for the `MyRabbitMQSource()` constructor in 
order:
+The followings have to be provided for the `RMQSink(…)` constructor in order:
 
 1. The hostname
 2. The queue name
+3. Serialisation schema
 
-Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` 
function with a new instance of the class which extends `RabbitMQSink`
+Example: 
 
 ~~~java
-DataStream<String> dataStream2 = env
-    .fromElements("one", "two", "three", "four", "five", "q")
-    .addSink(new MyRMQSink("localhost", "hello"));
+stream.addSink(new RMQSink<String>("localhost", "hello", new 
StringToByteSerializer()));
 ~~~
 
-The followings have to be provided for the `MyRabbitMQSink()` constructor in 
order:
-
-1. The hostname
-1. The queue name
 
 More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8500ad08/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 361abd6..61a6109 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -151,7 +151,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Create a DataStream using a user defined source function for arbitrary
-   * source functionality.
+   * source functionality. By default sources have a parallelism of 1. 
+   * To enable parallel execution, the user defined source should implement 
+   * ParallelSourceFunction or extend RichParallelSourceFunction. 
+   * In these cases the resulting source will have the parallelism of the 
environment. 
+   * To change this afterwards call DataStreamSource.setParallelism(int)
    *
    */
   def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): 
DataStream[T] = {

Reply via email to