Repository: spark Updated Branches: refs/heads/branch-1.0 395656c8e -> 66d9070bd
Update RecoverableNetworkWordCount.scala Trying this example, I missed the moment when the checkpoint was iniciated Author: comcmipi <pito...@fns.uniba.sk> Closes #2735 from comcmipi/patch-1 and squashes the following commits: b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala 96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66d9070b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66d9070b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66d9070b Branch: refs/heads/branch-1.0 Commit: 66d9070bde2d83e4857612143228e59c9e5517f1 Parents: 395656c Author: comcmipi <pito...@fns.uniba.sk> Authored: Mon Nov 10 12:33:48 2014 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Nov 10 12:42:51 2014 -0800 ---------------------------------------------------------------------- .../spark/examples/streaming/RecoverableNetworkWordCount.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/66d9070b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 6af3a0f..d90a84d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -69,7 +69,7 @@ import org.apache.spark.util.IntParam object RecoverableNetworkWordCount { - def createContext(ip: String, port: Int, outputPath: String) = { + def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint @@ -79,6 +79,7 @@ object RecoverableNetworkWordCount { val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) + ssc.checkpoint(checkpointDirectory) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') @@ -114,7 +115,7 @@ object RecoverableNetworkWordCount { val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { - createContext(ip, port, outputPath) + createContext(ip, port, outputPath, checkpointDirectory) }) ssc.start() ssc.awaitTermination() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org