[FLINK-1625] [streaming] Streaming cancellation minor fix and documentation

This closes #449


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08ef02eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08ef02eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08ef02eb

Branch: refs/heads/master
Commit: 08ef02ebade3016b31fe4b401e93fa0a7080147c
Parents: 8436e9c
Author: mbalassi <[email protected]>
Authored: Wed Mar 4 16:27:07 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Wed Mar 4 22:38:59 2015 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 12 ++++---
 .../api/function/sink/SinkFunction.java         | 15 ++++++++
 .../function/source/ParallelSourceFunction.java |  6 +++-
 .../api/function/source/SourceFunction.java     | 38 +++++++++++++++-----
 .../api/invokable/StreamInvokable.java          |  8 +++++
 5 files changed, 65 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 3d6b75e..0fb7dac 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -168,7 +168,11 @@ Usage: `dataStream.broadcast()`
  * *Global*: All data points end up at the same operator instance. To achieve 
this use the parallelism setting of the corresponding operator.
 Usage: `operator.setParallelism(1)`
 
-### Sources
+### Connecting to the outside world
+
+The user is expected to connect to the outside world through the source and 
the sink interfaces. We provide a `cancel()` method where allocated resources 
can be freed up in case some other parts of the topology failed. The `cancel()` 
method is called upon termination.
+
+#### Sources
 
 The user can connect to data streams by the different implementations of 
`SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. 
In contrast with other operators, DataStreamSources have a default operator 
parallelism of 1.
 
@@ -186,7 +190,7 @@ There are several predefined ones similar to the ones of 
the batch API and some
 These can be used to easily test and debug streaming programs.
 There are pre-implemented connectors for a number of the most popular message 
queue services, please refer to the section on [connectors](#stream-connectors) 
for more detail.
 
-### Sinks
+#### Sinks
 
 `DataStreamSink` represents the different outputs of a Flink Streaming 
program. There are several pre-defined implementations available right away:
 
@@ -495,13 +499,13 @@ Most data stream operators support directed outputs 
(output splitting), meaning
 
 ~~~java
 SplitDataStream<Integer> split = someDataStream.split(outputSelector);
-DataStream<Integer> even = split.select("even”);
+DataStream<Integer> even = split.select("even");
 DataStream<Integer> odd = split.select("odd");
 ~~~
 
 In the above example the data stream named ‘even’ will only contain 
elements that are directed to the output named “even”. The user can of 
course further transform these new stream by for example squaring only the even 
elements.
 
-Data streams only receive the elements directed to selected output names. The 
user can also select multiple output names by 
`splitStream.select(“output1”, “output2”…)`. It is common that a 
stream listens to all the outputs, so `split.selectAll()` provides this 
functionality without having to select all names.
+Data streams only receive the elements directed to selected output names. The 
user can also select multiple output names by 
`splitStream.select(“output1”, “output2”, …)`. It is common that a 
stream listens to all the outputs, so `split.selectAll()` provides this 
functionality without having to select all names.
 
 The outputs of an operator are directed by implementing a selector function 
(implementing the `OutputSelector` interface):
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 05ae34d..ffa5a67 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -21,10 +21,25 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
 
+/**
+ * Interface for implementing user defined sink functionality.
+ *
+ * @param <IN> INput type parameter.
+ */
 public interface SinkFunction<IN> extends Function, Serializable {
 
+       /**
+        * Function for standard sink behaviour. This function is called for 
every record.
+        *
+        * @param value The input record.
+        * @throws Exception
+        */
        public void invoke(IN value) throws Exception;
 
+       /**
+        * In case another vertex in topology fails this method is called 
before terminating
+        * the sink. Make sure to free up any allocated resources here.
+        */
        public void cancel();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
index 041915f..e37e851 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
@@ -17,6 +17,10 @@
 
 package org.apache.flink.streaming.api.function.source;
 
+/**
+ * {@link SourceFunction} that may be executed in parallel.
+ *
+ * @param <OUT>
+ */
 public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 4f579fe..af63d80 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -12,20 +12,40 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
- * limitations under the License.
+ * limitations under the License.
  */
 
 package org.apache.flink.streaming.api.function.source;
 
 import java.io.Serializable;
-
+
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.util.Collector;
 
-public interface SourceFunction<OUT> extends Function, Serializable {
-
-       public void run(Collector<OUT> collector) throws Exception;
-       
-       public void cancel();
+/**
+ * Interface for implementing user defined source functionality.
+ *
+ * <p>Sources implementing this specific interface are executed with
+ * degree of parallelism 1. To execute your sources in parallel
+ * see {@link ParallelSourceFunction}.</p>
+ *
+ * @param <OUT> Output type parameter.
+ */
+public interface SourceFunction<OUT> extends Function, Serializable {
+
+       /**
+        * Function for standard source behaviour. This function is called only 
once
+        * thus to produce multiple outputs make sure to produce multiple 
records.
+        *
+        * @param collector Collector for passing output records
+        * @throws Exception
+        */
+       public void run(Collector<OUT> collector) throws Exception;
+
+       /**
+        * In case another vertex in topology fails this method is called 
before terminating
+        * the source. Make sure to free up any allocated resources here.
+        */
+       public void cancel();
                
-}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 85fb9a4..abe31d4 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -114,6 +114,14 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
                                // Task already cancelled do nothing
                                return null;
                        }
+               }  catch (IllegalStateException e) {
+                       if (isRunning) {
+                               throw new RuntimeException("Could not read next 
record due to: "
+                                               + 
StringUtils.stringifyException(e));
+                       } else {
+                               // Task already cancelled do nothing
+                               return null;
+                       }
                }
        }
 

Reply via email to