Repository: spark
Updated Branches:
  refs/heads/branch-1.0 395656c8e -> 66d9070bd


Update RecoverableNetworkWordCount.scala

Trying this example, I missed the moment when the checkpoint was iniciated

Author: comcmipi <pito...@fns.uniba.sk>

Closes #2735 from comcmipi/patch-1 and squashes the following commits:

b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala
96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66d9070b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66d9070b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66d9070b

Branch: refs/heads/branch-1.0
Commit: 66d9070bde2d83e4857612143228e59c9e5517f1
Parents: 395656c
Author: comcmipi <pito...@fns.uniba.sk>
Authored: Mon Nov 10 12:33:48 2014 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Nov 10 12:42:51 2014 -0800

----------------------------------------------------------------------
 .../spark/examples/streaming/RecoverableNetworkWordCount.scala  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/66d9070b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 6af3a0f..d90a84d 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -69,7 +69,7 @@ import org.apache.spark.util.IntParam
 
 object RecoverableNetworkWordCount {
 
-  def createContext(ip: String, port: Int, outputPath: String) = {
+  def createContext(ip: String, port: Int, outputPath: String, 
checkpointDirectory: String) = {
 
     // If you do not see this printed, that means the StreamingContext has 
been loaded
     // from the new checkpoint
@@ -79,6 +79,7 @@ object RecoverableNetworkWordCount {
     val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
     // Create the context with a 1 second batch size
     val ssc = new StreamingContext(sparkConf, Seconds(1))
+    ssc.checkpoint(checkpointDirectory)
 
     // Create a socket stream on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
@@ -114,7 +115,7 @@ object RecoverableNetworkWordCount {
     val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
     val ssc = StreamingContext.getOrCreate(checkpointDirectory,
       () => {
-        createContext(ip, port, outputPath)
+        createContext(ip, port, outputPath, checkpointDirectory)
       })
     ssc.start()
     ssc.awaitTermination()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to