Repository: spark
Updated Branches:
  refs/heads/branch-1.0 80f4360e7 -> 1fac4ecbd


[SPARK-1504], [SPARK-1505], [SPARK-1558] Updated Spark Streaming guide

- SPARK-1558: Updated custom receiver guide to match it with the new API
- SPARK-1504: Added deployment and monitoring subsection to streaming
- SPARK-1505: Added migration guide for migrating from 0.9.x and below to Spark 
1.0
- Updated various Java streaming examples to use JavaReceiverInputDStream to 
highlight the API change.
- Removed the requirement for cleaner ttl from streaming guide

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #652 from tdas/doc-fix and squashes the following commits:

cb4f4b7 [Tathagata Das] Possible fix for flaky graceful shutdown test.
ab71f7f [Tathagata Das] Merge remote-tracking branch 'apache-github/master' 
into doc-fix
8d6ff9b [Tathagata Das] Addded migration guide to Spark Streaming.
7d171df [Tathagata Das] Added reference to JavaReceiverInputStream in examples 
and streaming guide.
49edd7c [Tathagata Das] Change java doc links to use Java docs.
11528d7 [Tathagata Das] Updated links on index page.
ff80970 [Tathagata Das] More updates to streaming guide.
4dc42e9 [Tathagata Das] Added monitoring and other documentation in the 
streaming guide.
14c6564 [Tathagata Das] Updated custom receiver guide.

(cherry picked from commit a975a19f21e71f448b3fdb2ed4461e28ef439900)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fac4ecb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fac4ecb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fac4ecb

Branch: refs/heads/branch-1.0
Commit: 1fac4ecbdd0c15992fd75372cbd7fec24244d21b
Parents: 80f4360
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon May 5 15:28:19 2014 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon May 5 15:28:54 2014 -0700

----------------------------------------------------------------------
 docs/configuration.md                           |   6 +-
 docs/index.md                                   |   6 +-
 docs/streaming-custom-receivers.md              | 273 +++++++++++++------
 docs/streaming-programming-guide.md             | 200 +++++++++++---
 .../streaming/examples/JavaCustomReceiver.java  |   3 +-
 .../streaming/examples/JavaFlumeEventCount.java |   2 +-
 .../streaming/examples/JavaKafkaWordCount.java  |   4 +-
 .../examples/JavaNetworkWordCount.java          |   5 +-
 .../spark/streaming/receiver/Receiver.scala     |   6 +-
 .../spark/streaming/StreamingContextSuite.scala |   1 +
 10 files changed, 360 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1fac4ecb/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 81ad895..d6f316b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -462,7 +462,7 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>(infinite)</td>
   <td>
     Duration (seconds) of how long Spark will remember any metadata (stages 
generated, tasks generated, etc.).
-    Periodic cleanups will ensure that metadata older than this duration will 
be forgetten. This is
+    Periodic cleanups will ensure that metadata older than this duration will 
be forgotten. This is
     useful for running Spark for many hours / days (for example, running 24/7 
in case of Spark Streaming
     applications). Note that any RDD that persists in memory for more than 
this duration will be cleared as well.
   </td>
@@ -471,8 +471,8 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>spark.streaming.blockInterval</td>
   <td>200</td>
   <td>
-    Duration (milliseconds) of how long to batch new objects coming from 
network receivers used
-    in Spark Streaming.
+    Interval (milliseconds) at which data received by Spark Streaming 
receivers is coalesced
+    into blocks of data before storing them in Spark.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/1fac4ecb/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 6fc9a4f..2daa208 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -112,10 +112,10 @@ Note that on Windows, you need to set the environment 
variables on separate line
 * [Shark](http://shark.cs.berkeley.edu): Apache Hive over Spark
 * [Mailing Lists](http://spark.apache.org/mailing-lists.html): ask questions 
about Spark here
 * [AMP Camps](http://ampcamp.berkeley.edu/): a series of training camps at UC 
Berkeley that featured talks and
-  exercises about Spark, Shark, Mesos, and more. 
[Videos](http://ampcamp.berkeley.edu/agenda-2012),
-  [slides](http://ampcamp.berkeley.edu/agenda-2012) and 
[exercises](http://ampcamp.berkeley.edu/exercises-2012) are
+  exercises about Spark, Shark, Spark Streaming, Mesos, and more. 
[Videos](http://ampcamp.berkeley.edu/3/),
+  [slides](http://ampcamp.berkeley.edu/3/) and 
[exercises](http://ampcamp.berkeley.edu/3/exercises/) are
   available online for free.
-* [Code Examples](http://spark.apache.org/examples.html): more are also 
available in the [examples 
subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/)
 of Spark
+* [Code Examples](http://spark.apache.org/examples.html): more are also 
available in the [examples 
subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/)
 of Spark
 * [Paper Describing 
Spark](http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf)
 * [Paper Describing Spark 
Streaming](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1fac4ecb/docs/streaming-custom-receivers.md
----------------------------------------------------------------------
diff --git a/docs/streaming-custom-receivers.md 
b/docs/streaming-custom-receivers.md
index 3cfa451..a2dc3a8 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -3,126 +3,219 @@ layout: global
 title: Spark Streaming Custom Receivers
 ---
 
-A "Spark Streaming" receiver can be a simple network stream, streams of 
messages from a message queue, files etc. A receiver can also assume roles more 
than just receiving data like filtering, preprocessing, to name a few of the 
possibilities. The api to plug-in any user defined custom receiver is thus 
provided to encourage development of receivers which may be well suited to ones 
specific need.
+Spark Streaming can receive streaming data from any arbitrary data source 
beyond
+the one's for which it has in-built support (that is, beyond Flume, Kafka, 
files, sockets, etc.).
+This requires the developer to implement a *receiver* that is customized for 
receiving data from
+the concerned data source. This guide walks through the process of 
implementing a custom receiver
+and using it in a Spark Streaming application.
+
+### Implementing a Custom Receiver
+
+This starts with implementing a 
[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver).
+A custom receiver must extend this abstract class by implementing two methods
+- `onStart()`: Things to do to start receiving data.
+- `onStop()`: Things to do to stop receiving data.
+
+Note that `onStart()` and `onStop()` must not block indefinitely. Typically, 
onStart() would start the threads
+that responsible for receiving the data and `onStop()` would ensure that the 
receiving by those threads
+are stopped. The receiving threads can also use `isStopped()`, a `Receiver` 
method, to check whether they
+should stop receiving data.
+
+Once the data is received, that data can be stored inside Spark
+by calling `store(data)`, which is a method provided by the
+[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) 
class.
+There are number of flavours of `store()` which allow you store the received 
data
+record-at-a-time or as whole collection of objects / serialized bytes.
+
+Any exception in the receiving threads should be caught and handled properly 
to avoid silent
+failures of the receiver. `restart(<exception>)` will restart the receiver by
+asynchronously calling `onStop()` and then calling `onStart()` after a delay.
+`stop(<exception>)` will call `onStop()` and terminate the receiver. Also, 
`reportError(<error>)`
+reports a error message to the driver (visible in the logs and UI) without 
stopping / restarting
+the receiver.
+
+The following is a custom receiver that receives a stream of text over a 
socket. It treats
+'\n' delimited lines in the text stream as records and stores them with Spark. 
If the receiving thread
+has any error connecting or receiving, the receiver is restarted to make 
another attempt to connect.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1" >
 
-This guide shows the programming model and features by walking through a 
simple sample receiver and corresponding Spark Streaming application.
+{% highlight scala %}
 
-### Writing a Simple Receiver
+class CustomReceiver(host: String, port: Int)
+  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
+
+  def onStart() {
+    // Start the thread that receives data over a connection
+    new Thread("Socket Receiver") {
+      override def run() { receive() }
+    }.start()
+  }
+
+  def onStop() {
+   // There is nothing much to do as the thread calling receive()
+   // is designed to stop by itself isStopped() returns false
+  }
+
+  /** Create a socket connection and receive data until receiver is stopped */
+  private def receive() {
+    var socket: Socket = null
+    var userInput: String = null
+    try {
+     // Connect to host:port
+     socket = new Socket(host, port)
+
+     // Until stopped or connection broken continue reading
+     val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream(), "UTF-8"))
+     userInput = reader.readLine()
+     while(!isStopped && userInput != null) {
+       store(userInput)
+       userInput = reader.readLine()
+     }
+     reader.close()
+     socket.close()
+
+     // Restart in an attempt to connect again when server is active again
+     restart("Trying to connect again")
+    } catch {
+     case e: java.net.ConnectException =>
+       // restart if could not connect to server
+       restart("Error connecting to " + host + ":" + port, e)
+     case t: Throwable =>
+       // restart if there is any other error
+       restart("Error receiving data", t)
+    }
+  }
+}
 
-This starts with implementing 
[NetworkReceiver](api/scala/index.html#org.apache.spark.streaming.dstream.NetworkReceiver).
+{% endhighlight %}
 
-The following is a simple socket text-stream receiver.
+</div>
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+
+public class JavaCustomReceiver extends Receiver<String> {
+
+  String host = null;
+  int port = -1;
+
+  public JavaCustomReceiver(String host_ , int port_) {
+    super(StorageLevel.MEMORY_AND_DISK_2());
+    host = host_;
+    port = port_;
+  }
+
+  public void onStart() {
+    // Start the thread that receives data over a connection
+    new Thread()  {
+      @Override public void run() {
+        receive();
+      }
+    }.start();
+  }
+
+  public void onStop() {
+    // There is nothing much to do as the thread calling receive()
+    // is designed to stop by itself isStopped() returns false
+  }
+
+  /** Create a socket connection and receive data until receiver is stopped */
+  private void receive() {
+    Socket socket = null;
+    String userInput = null;
+
+    try {
+      // connect to the server
+      socket = new Socket(host, port);
+
+      BufferedReader reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
+
+      // Until stopped or connection broken continue reading
+      while (!isStopped() && (userInput = reader.readLine()) != null) {
+        System.out.println("Received data '" + userInput + "'");
+        store(userInput);
+      }
+      reader.close();
+      socket.close();
+
+      // Restart in an attempt to connect again when server is active again
+      restart("Trying to connect again");
+    } catch(ConnectException ce) {
+      // restart if could not connect to server
+      restart("Could not connect", ce);
+    } catch(Throwable t) {
+      // restart if there is any other error
+      restart("Error receiving data", t);
+    }
+  }
+}
 
-{% highlight scala %}
-       class SocketTextStreamReceiver(host: String, port: Int)
-         extends NetworkReceiver[String]
-       {
-         protected lazy val blocksGenerator: BlockGenerator =
-           new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
-
-         protected def onStart() = {
-           blocksGenerator.start()
-           val socket = new Socket(host, port)
-           val dataInputStream = new BufferedReader(new 
InputStreamReader(socket.getInputStream(), "UTF-8"))
-           var data: String = dataInputStream.readLine()
-           while (data != null) {
-             blocksGenerator += data
-             data = dataInputStream.readLine()
-           }
-         }
-
-         protected def onStop() {
-           blocksGenerator.stop()
-         }
-       }
 {% endhighlight %}
 
+</div>
+</div>
 
-All we did here is extended NetworkReceiver and called blockGenerator's API 
method (i.e. +=) to push our blocks of data. Please refer to scala-docs of 
NetworkReceiver for more details.
 
+### Using the custom receiver in a Spark Streaming application
 
-### An Actor as Receiver
+The custom receiver can be used in a Spark Streaming application by using
+`streamingContext.receiverStream(<instance of custom receiver>)`. This will 
create
+input DStream using data received by the instance of custom receiver, as shown 
below
 
-This starts with implementing [Actor](#References)
-
-Following is a simple socket text-stream receiver, which is appearently overly 
simplified using Akka's socket.io api.
+<div class="codetabs">
+<div data-lang="scala"  markdown="1" >
 
 {% highlight scala %}
-       class SocketTextStreamReceiver (host:String,
-         port:Int,
-         bytesToString: ByteString => String) extends Actor with Receiver {
-
-          override def preStart = IOManager(context.system).connect(host, port)
-
-          def receive = {
-           case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
-         }
-
-       }
+// Assuming ssc is the StreamingContext
+val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
+val words = lines.flatMap(_.split(" "))
+...
 {% endhighlight %}
 
-All we did here is mixed in trait Receiver and called pushBlock api method to 
push our blocks of data. Please refer to scala-docs of Receiver for more 
details.
-
-### A Sample Spark Application
+The full source code is in the example 
[CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).
 
-* First create a Spark streaming context with master url and batchduration.
+</div>
+<div data-lang="java" markdown="1">
 
-{% highlight scala %}
-    val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
-      Seconds(batchDuration))
+{% highlight java %}
+// Assuming ssc is the JavaStreamingContext
+JavaDStream<String> customReceiverStream = ssc.receiverStream(new 
JavaCustomReceiver(host, port));
+JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() { ... });
+...
 {% endhighlight %}
 
-* Plug-in the custom receiver into the spark streaming context and create a 
DStream.
+The full source code is in the example 
[JavaCustomReceiver.java](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java).
 
-{% highlight scala %}
-    val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
-      "localhost", 8445))
-{% endhighlight %}
+</div>
+</div>
 
-* OR Plug-in the actor as receiver into the spark streaming context and create 
a DStream.
 
-{% highlight scala %}
-    val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
-      "localhost",8445, z => z.utf8String)),"SocketReceiver")
-{% endhighlight %}
 
-* Process it.
+### Implementing and Using a Custom Actor-based Receiver
 
-{% highlight scala %}
-    val words = lines.flatMap(_.split(" "))
-    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can 
also be used to
+receive data. The 
[`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
+trait can be applied on any Akka actor, which allows received data to be 
stored in Spark using
+ `store(...)` methods. The supervisor strategy of this actor can be configured 
to handle failures, etc.
 
-    wordCounts.print()
-    ssc.start()
+{% highlight scala %}
+class CustomActor extends Actor with ActorHelper {
+  def receive = {
+   case data: String => store(data)
+  }
+}
 {% endhighlight %}
 
-* After processing it, stream can be tested using the netcat utility.
-
-     $ nc -l localhost 8445
-     hello world
-     hello hello
-
-
-## Multiple Homogeneous/Heterogeneous Receivers.
-
-A DStream union operation is provided for taking union on multiple input 
streams.
+And a new input stream can be created with this custom actor as
 
 {% highlight scala %}
-    val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
-      "localhost",8445, z => z.utf8String)),"SocketReceiver")
-
-    // Another socket stream receiver
-    val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
-      "localhost",8446, z => z.utf8String)),"SocketReceiver")
-
-    val union = lines.union(lines2)
+// Assuming ssc is the StreamingContext
+val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
 {% endhighlight %}
 
-Above stream can be easily process as described earlier.
-
-_A more comprehensive example is provided in the spark streaming examples_
+See 
[ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala)
+for an end-to-end example.
 
-## References
 
-1.[Akka Actor 
documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
-2.[NetworkReceiver](api/scala/index.html#org.apache.spark.streaming.dstream.NetworkReceiver)

http://git-wip-us.apache.org/repos/asf/spark/blob/1fac4ecb/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index b22bb45..e8b718b 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -136,7 +136,7 @@ The complete code can be found in the Spark Streaming 
example
 <div data-lang="java" markdown="1">
 
 First, we create a
-[JavaStreamingContext](api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext)
 object,
+[JavaStreamingContext](api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html)
 object,
 which is the main entry point for all streaming
 functionality. Besides Spark's configuration, we specify that any DStream 
would be processed
 in 1 second batches.
@@ -155,7 +155,7 @@ by specifying the IP address and port of the data server.
 
 {% highlight java %}
 // Create a DStream that will connect to serverIP:serverPort, like 
localhost:9999
-JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
+JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 
9999);
 {% endhighlight %}
 
 This `lines` DStream represents the stream of data that will be received from 
the data
@@ -863,6 +863,51 @@ For DStreams that must be checkpointed (that is, DStreams 
created by `updateStat
 `reduceByKeyAndWindow` with inverse function), the checkpoint interval of the 
DStream is by
 default set to a multiple of the DStream's sliding interval such that its at 
least 10 seconds.
 
+## Deployment
+A Spark Streaming application is deployed on a cluster in the same way as any 
other Spark application.
+Please refer to the [deployment guide](cluster-overview.html) for more details.
+
+If a running Spark Streaming application needs to be upgraded (with new 
application code), then
+there are two possible mechanism.
+
+- The upgraded Spark Streaming application is started and run in parallel to 
the existing application.
+Once the new one (receiving the same data as the old one) has been warmed up 
and ready
+for prime time, the old one be can be brought down. Note that this can be done 
for data sources that support
+sending the data to two destinations (i.e., the earlier and upgraded 
applications).
+
+- The existing application is shutdown gracefully (see
+[`StreamingContext.stop(...)`](api/scala/index.html#org.apache.spark.streaming.StreamingContext)
+or 
[`JavaStreamingContext.stop(...)`](api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html)
+for graceful shutdown options) which ensure data that have been received is 
completely
+processed before shutdown. Then the
+upgraded application can be started, which will start processing from the same 
point where the earlier
+application left off. Note that this can be done only with input sources that 
support source-side buffering
+(like Kafka, and Flume) as data needs to be buffered while the previous 
application down and
+the upgraded application is not yet up.
+
+## Monitoring
+Beyond Spark's [monitoring capabilities](monitoring.html), there are 
additional capabilities
+specific to Spark Streaming. When a StreamingContext is used, the
+[Spark web UI](monitoring.html#web-interfaces) shows
+an additional `Streaming` tab which shows statistics about running receivers 
(whether
+receivers are active, number of records received, receiver error, etc.)
+and completed batches (batch processing times, queueing delays, etc.). This 
can be used to
+monitor the progress of the streaming application.
+
+The following two metrics in web UI is particularly important -
+*Processing Time* and *Scheduling Delay* (under *Batch Processing 
Statistics*). The first is the
+time to process each batch of data, and the second is the time a batch waits 
in a queue
+for the processing of previous batches to finish. If the batch processing time 
is consistently more
+than the batch interval and/or the queueing delay keeps increasing, then it 
indicates the system is
+not able to process the batches as fast they are being generated and falling 
behind.
+In that case, consider
+[reducing](#reducing-the-processing-time-of-each-batch) the batch processing 
time.
+
+The progress of a Spark Streaming program can also be monitored using the
+[StreamingListener](api/scala/index.html#org.apache.spark.scheduler.StreamingListener)
 interface,
+which allows you to get receiver status and processing times. Note that this 
is a developer API
+and it is likely to be improved upon (i.e., more information reported) in the 
future.
+
 
***************************************************************************************************
  
 
 # Performance Tuning
@@ -875,7 +920,8 @@ improve the performance of you application. At a high 
level, you need to conside
   Reducing the processing time of each batch of data by efficiently using 
cluster resources.
 </li>
 <li>
-  Setting the right batch size such that the data processing can keep up with 
the data ingestion.
+  Setting the right batch size such that the batches of data can be processed 
as fast as they
+  are received (that is, data processing keeps up with the data ingestion).
 </li>
 </ol>
 
@@ -884,7 +930,30 @@ There are a number of optimizations that can be done in 
Spark to minimize the pr
 each batch. These have been discussed in detail in [Tuning 
Guide](tuning.html). This section
 highlights some of the most important ones.
 
-### Level of Parallelism
+### Level of Parallelism in Data Receiving
+Receiving data over the network (like Kafka, Flume, socket, etc.) requires the 
data to deserialized
+and stored in Spark. If the data receiving becomes a bottleneck in the system, 
then consider
+parallelizing the data receiving. Note that each input DStream
+creates a single receiver (running on a worker machine) that receives a single 
stream of data.
+Receiving multiple data streams can therefore be achieved by creating multiple 
input DStreams
+and configuring them to receive different partitions of the data stream from 
the source(s).
+For example, a single Kafka input stream receiving two topics of data can be 
split into two
+Kafka input streams, each receiving only one topic. This would run two 
receivers on two workers,
+thus allowing data to received in parallel, and increasing overall throughput.
+
+Another parameter that should be considered is the receiver's blocking 
interval. For most receivers,
+the received data is coalesced together into large blocks of data before 
storing inside Spark's memory.
+The number of blocks in each batch determines the number of tasks that will be 
used to process those
+the received data in a map-like transformation. This blocking interval is 
determined by the
+[configuration parameter](configuration.html) `spark.streaming.blockInterval` 
and the default value
+is 200 milliseconds.
+
+An alternative to receiving data with multiple input streams / receivers is to 
explicitly repartition
+the input data stream (using `inputStream.repartition(<number of 
partitions>)`).
+This distributes the received batches of data across all the machines in the 
cluster
+before further processing.
+
+### Level of Parallelism in Data Processing
 Cluster resources maybe under-utilized if the number of parallel tasks used in 
any stage of the
 computation is not high enough. For example, for distributed reduce operations 
like `reduceByKey`
 and `reduceByKeyAndWindow`, the default number of parallel tasks is 8. You can 
pass the level of
@@ -921,16 +990,22 @@ These changes may reduce batch processing time by 100s of 
milliseconds,
 thus allowing sub-second batch size to be viable.
 
 ## Setting the Right Batch Size
-For a Spark Streaming application running on a cluster to be stable, the 
processing of the data
-streams must keep up with the rate of ingestion of the data streams. Depending 
on the type of
-computation, the batch size used may have significant impact on the rate of 
ingestion that can be
-sustained by the Spark Streaming application on a fixed cluster resources. For 
example, let us
+For a Spark Streaming application running on a cluster to be stable, the 
system should be able to
+process data as fast as it is being received. In other words, batches of data 
should be processed
+as fast as they are being generated. Whether this is true for an application 
can be found by
+[monitoring](#monitoring) the processing times in the streaming web UI, where 
the batch
+processing time should be less than the batch interval.
+
+Depending on the nature of the streaming
+computation, the batch interval used may have significant impact on the data 
rates that can be
+sustained by the application on a fixed set of cluster resources. For example, 
let us
 consider the earlier WordCountNetwork example. For a particular data rate, the 
system may be able
-to keep up with reporting word counts every 2 seconds (i.e., batch size of 2 
seconds), but not
-every 500 milliseconds.
+to keep up with reporting word counts every 2 seconds (i.e., batch interval of 
2 seconds), but not
+every 500 milliseconds. So the batch interval needs to be set such that the 
expected data rate in
+production can be sustained.
 
 A good approach to figure out the right batch size for your application is to 
test it with a
-conservative batch size (say, 5-10 seconds) and a low data rate. To verify 
whether the system
+conservative batch interval (say, 5-10 seconds) and a low data rate. To verify 
whether the system
 is able to keep up with data rate, you can check the value of the end-to-end 
delay experienced
 by each processed batch (either look for "Total delay" in Spark driver log4j 
logs, or use the
 
[StreamingListener](api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener)
@@ -942,29 +1017,6 @@ data rate and/or reducing the batch size. Note that 
momentary increase in the de
 temporary data rate increases maybe fine as long as the delay reduces back to 
a low value
 (i.e., less than batch size).
 
-## 24/7 Operation
-By default, Spark does not forget any of the metadata (RDDs generated, stages 
processed, etc.).
-But for a Spark Streaming application to operate 24/7, it is necessary for 
Spark to do periodic
-cleanup of it metadata. This can be enabled by setting the
-[configuration property](configuration.html#spark-properties) 
`spark.cleaner.ttl` to the number of
-seconds you want any metadata to persist. For example, setting 
`spark.cleaner.ttl` to 600 would
-cause Spark periodically cleanup all metadata and persisted RDDs that are 
older than 10 minutes.
-Note, that this property needs to be set before the SparkContext is created.
-
-This value is closely tied with any window operation that is being used. Any 
window operation
-would require the input data to be persisted in memory for at least the 
duration of the window.
-Hence it is necessary to set the delay to at least the value of the largest 
window operation used
-in the Spark Streaming application. If this delay is set too low, the 
application will throw an
-exception saying so.
-
-## Monitoring
-Besides Spark's in-built [monitoring capabilities](monitoring.html),
-the progress of a Spark Streaming program can also be monitored using the 
[StreamingListener]
-(api/scala/index.html#org.apache.spark.scheduler.StreamingListener) interface,
-which allows you to get statistics of batch processing times, queueing delays,
-and total end-to-end delays. Note that this is still an experimental API and 
it is likely to be
-improved upon (i.e., more information reported) in the future.
-
 ## Memory Tuning
 Tuning the memory usage and GC behavior of Spark applications have been 
discussed in great detail
 in the [Tuning Guide](tuning.html). It is recommended that you read that. In 
this section,
@@ -1249,18 +1301,80 @@ in the file. This is what the sequence of outputs would 
be with and without a dr
 If the driver had crashed in the middle of the processing of time 3, then it 
will process time 3
 and output 30 after recovery.
 
+***************************************************************************************************
  
+
+# Migration Guide from 0.9.1 or below to 1.x
+Between Spark 0.9.1 and Spark 1.0, there were a few API changes made to ensure 
future API stability.
+This section elaborates the steps required to migrate your existing code to 
1.0.
+
+**Input DStreams**: All operations that create an input stream (e.g., 
`StreamingContext.socketStream`,
+`FlumeUtils.createStream`, etc.) now returns
+[InputDStream](api/scala/index.html#org.apache.spark.streaming.dstream.InputDStream)
 /
+[ReceiverInputDStream](api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream)
+(instead of DStream) for Scala, and 
[JavaInputDStream](api/java/org/apache/spark/streaming/api/java/JavaInputDStream.html)
 /
+[JavaPairInputDStream](api/java/org/apache/spark/streaming/api/java/JavaPairInputDStream.html)
 /
+[JavaReceiverInputDStream](api/java/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.html)
 /
+[JavaPairReceiverInputDStream](api/java/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html)
+(instead of JavaDStream) for Java. This ensures that functionality specific to 
input streams can
+be added to these classes in the future without breaking binary compatibility.
+Note that your existing Spark Streaming applications should not require any 
change
+(as these new classes are subclasses of DStream/JavaDStream) but may require 
recompilation with Spark 1.0.
+
+**Custom Network Receivers**: Since the release to Spark Streaming, custom 
network receivers could be defined
+in Scala using the class NetworkReceiver. However, the API was limited in 
terms of error handling
+and reporting, and could not be used from Java. Starting Spark 1.0, this class 
has been
+replaced by 
[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) 
which has
+the following advantages.
+
+* Methods like `stop` and `restart` have been added to for better control of 
the lifecycle of a receiver. See
+the [custom receiver guide](streaming-custom-receiver.html) for more details.
+* Custom receivers can be implemented using both Scala and Java.
+
+To migrate your existing custom receivers from the earlier NetworkReceiver to 
the new Receiver, you have
+to do the following.
+
+* Make your custom receiver class extend
+[`org.apache.spark.streaming.receiver.Receiver`](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver)
+instead of `org.apache.spark.streaming.dstream.NetworkReceiver`.
+* Earlier, a BlockGenerator object had to be created by the custom receiver, 
to which received data was
+added for being stored in Spark. It had to be explicitly started and stopped 
from `onStart()` and `onStop()`
+methods. The new Receiver class makes this unnecessary as it adds a set of 
methods named `store(<data>)`
+that can be called to store the data in Spark. So, to migrate your custom 
network receiver, remove any
+BlockGenerator object (does not exist any more in Spark 1.0 anyway), and use 
`store(...)` methods on
+received data.
+
+**Actor-based Receivers**: Data could have been received using any Akka Actors 
by extending the actor class with
+`org.apache.spark.streaming.receivers.Receiver` trait. This has been renamed to
+[`org.apache.spark.streaming.receiver.ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
+and the `pushBlock(...)` methods to store received data has been renamed to 
`store(...)`. Other helper classes in
+the `org.apache.spark.streaming.receivers` package were also moved
+to 
[`org.apache.spark.streaming.receiver`](api/scala/index.html#org.apache.spark.streaming.receiver.package)
+package and renamed for better clarity.
+
+***************************************************************************************************
+
 # Where to Go from Here
 
 * API documentation
-  - Main docs of StreamingContext and DStreams in 
[Scala](api/scala/index.html#org.apache.spark.streaming.package)
-    and 
[Java](api/scala/index.html#org.apache.spark.streaming.api.java.package)
-  - Additional docs for
-    [Kafka](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$),
-    [Flume](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$),
-    
[Twitter](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$),
-    
[ZeroMQ](api/scala/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$), 
and
-    [MQTT](api/scala/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$)
+  - Scala docs
+    * 
[StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext)
 and
+  [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
+    * 
[KafkaUtils](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$),
+    
[FlumeUtils](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$),
+    
[TwitterUtils](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$),
+    
[ZeroMQUtils](api/scala/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$),
 and
+    
[MQTTUtils](api/scala/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$)
+  - Java docs
+    * 
[JavaStreamingContext](api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html),
+    
[JavaDStream](api/java/org/apache/spark/streaming/api/java/JavaDStream.html) and
+    
[PairJavaDStream](api/java/org/apache/spark/streaming/api/java/PairJavaDStream.html)
+    * [KafkaUtils](api/java/org/apache/spark/streaming/kafka/KafkaUtils.html),
+    [FlumeUtils](api/java/org/apache/spark/streaming/flume/FlumeUtils.html),
+    
[TwitterUtils](api/java/org/apache/spark/streaming/twitter/TwitterUtils.html),
+    
[ZeroMQUtils](api/java/org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and
+    [MQTTUtils](api/java/org/apache/spark/streaming/mqtt/MQTTUtils.html)
 
 * More examples in 
[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples)
   and 
[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/streaming/examples)
-* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) 
describing Spark Streaming.
+* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) 
and
+[video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming.

http://git-wip-us.apache.org/repos/asf/spark/blob/1fac4ecb/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
 
b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
index a94fa62..e36c780 100644
--- 
a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
+++ 
b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
@@ -26,6 +26,7 @@ import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.receiver.Receiver;
 import scala.Tuple2;
@@ -69,7 +70,7 @@ public class JavaCustomReceiver extends Receiver<String> {
 
     // Create a input stream with the custom receiver on target ip:port and 
count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
-    JavaDStream<String> lines = ssc.receiverStream(
+    JavaReceiverInputDStream<String> lines = ssc.receiverStream(
       new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
       @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/1fac4ecb/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
 
b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index f061001..c59f753 100644
--- 
a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ 
b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -58,7 +58,7 @@ public final class JavaFlumeEventCount {
     JavaStreamingContext ssc = new JavaStreamingContext(master, 
"FlumeEventCount", batchInterval,
             System.getenv("SPARK_HOME"),
             JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
-    JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, 
"localhost", port);
+    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = 
FlumeUtils.createStream(ssc, "localhost", port);
 
     flumeStream.count();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1fac4ecb/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
 
b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index d704be0..8da9bcd 100644
--- 
a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -29,6 +29,7 @@ import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.kafka.KafkaUtils;
 import scala.Tuple2;
@@ -73,7 +74,8 @@ public final class JavaKafkaWordCount {
       topicMap.put(topic, numThreads);
     }
 
-    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, 
args[1], args[2], topicMap);
+    JavaPairReceiverInputDStream<String, String> messages =
+            KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
 
     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
       @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/1fac4ecb/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 0cc9d0a..098c329 100644
--- 
a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming.examples;
 
 import com.google.common.collect.Lists;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import scala.Tuple2;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function2;
@@ -57,9 +58,9 @@ public final class JavaNetworkWordCount {
             new Duration(1000), System.getenv("SPARK_HOME"),
             JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
 
-    // Create a NetworkInputDStream on target ip:port and count the
+    // Create a JavaReceiverInputDStream on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
-    JavaDStream<String> lines = ssc.socketTextStream(args[1], 
Integer.parseInt(args[2]));
+    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[1], 
Integer.parseInt(args[2]));
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
       @Override
       public Iterable<String> call(String x) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1fac4ecb/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index b310c22..5acf8a9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -28,9 +28,11 @@ import org.apache.spark.annotation.DeveloperApi
 /**
  * :: DeveloperApi ::
  * Abstract class of a receiver that can be run on worker nodes to receive 
external data. A
- * custom receiver can be defined by defining the functions onStart() and 
onStop(). onStart()
+ * custom receiver can be defined by defining the functions `onStart()` and 
`onStop()`. `onStart()`
  * should define the setup steps necessary to start receiving data,
- * and onStop() should define the cleanup steps necessary to stop receiving 
data.
+ * and `onStop()` should define the cleanup steps necessary to stop receiving 
data.
+ * Exceptions while receiving can be handled either by restarting the receiver 
with `restart(...)`
+ * or stopped completely by `stop(...)` or
  *
  * A custom receiver in Scala would look like this.
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/1fac4ecb/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index ee0bc8b..cd86019 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -183,6 +183,7 @@ class StreamingContextSuite extends FunSuite with 
BeforeAndAfter with Timeouts w
         "Received records = " + TestReceiver.counter.get() + ", " +
           "processed records = " + runningCount
       )
+      Thread.sleep(100)
     }
   }
 

Reply via email to