Repository: spark
Updated Branches:
  refs/heads/branch-1.6 94fb5e870 -> 942c0577b


[SPARK-12429][STREAMING][DOC] Add Accumulator and Broadcast example for 
Streaming

This PR adds Scala, Java and Python examples to show how to use Accumulator and 
Broadcast in Spark Streaming to support checkpointing.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10385 from zsxwing/accumulator-broadcast-example.

(cherry picked from commit 20591afd790799327f99485c5a969ed7412eca45)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/942c0577
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/942c0577
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/942c0577

Branch: refs/heads/branch-1.6
Commit: 942c0577b201a08fffdcaf71e4d1867266ae309e
Parents: 94fb5e8
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Dec 22 16:39:10 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Dec 22 16:39:20 2015 -0800

----------------------------------------------------------------------
 docs/programming-guide.md                       |   6 +-
 docs/streaming-programming-guide.md             | 165 +++++++++++++++++++
 .../JavaRecoverableNetworkWordCount.java        |  71 +++++++-
 .../streaming/recoverable_network_wordcount.py  |  30 +++-
 .../streaming/RecoverableNetworkWordCount.scala |  66 +++++++-
 5 files changed, 325 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/942c0577/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index f823b89..010346e 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more 
complicated, and the above may
 
 What is happening here is that the variables within the closure sent to each 
executor are now copies and thus, when **counter** is referenced within the 
`foreach` function, it's no longer the **counter** on the driver node. There is 
still a **counter** in the memory of the driver node but this is no longer 
visible to the executors! The executors only see the copy from the serialized 
closure. Thus, the final value of **counter** will still be zero since all 
operations on **counter** were referencing the value within the serialized 
closure.  
 
-To ensure well-defined behavior in these sorts of scenarios one should use an 
[`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to 
provide a mechanism for safely updating a variable when execution is split up 
across worker nodes in a cluster. The Accumulators section of this guide 
discusses these in more detail.  
+To ensure well-defined behavior in these sorts of scenarios one should use an 
[`Accumulator`](#accumulators). Accumulators in Spark are used specifically to 
provide a mechanism for safely updating a variable when execution is split up 
across worker nodes in a cluster. The Accumulators section of this guide 
discusses these in more detail.  
 
 In general, closures - constructs like loops or locally defined methods, 
should not be used to mutate some global state. Spark does not define or 
guarantee the behavior of mutations to objects referenced from outside of 
closures. Some code that does this may work in local mode, but that's just by 
accident and such code will not behave as expected in distributed mode. Use an 
Accumulator instead if some global aggregation is needed.
 
@@ -1091,7 +1091,7 @@ for details.
 </tr>
 <tr>
   <td> <b>foreach</b>(<i>func</i>) </td>
-  <td> Run a function <i>func</i> on each element of the dataset. This is 
usually done for side effects such as updating an <a 
href="#AccumLink">Accumulator</a> or interacting with external storage systems.
+  <td> Run a function <i>func</i> on each element of the dataset. This is 
usually done for side effects such as updating an <a 
href="#accumulators">Accumulator</a> or interacting with external storage 
systems.
   <br /><b>Note</b>: modifying variables other than Accumulators outside of 
the <code>foreach()</code> may result in undefined behavior. See <a 
href="#ClosuresLink">Understanding closures </a> for more details.</td>
 </tr>
 </table>
@@ -1336,7 +1336,7 @@ run on the cluster so that `v` is not shipped to the 
nodes more than once. In ad
 `v` should not be modified after it is broadcast in order to ensure that all 
nodes get the same
 value of the broadcast variable (e.g. if the variable is shipped to a new node 
later).
 
-## Accumulators <a name="AccumLink"></a>
+## Accumulators
 
 Accumulators are variables that are only "added" to through an associative 
operation and can
 therefore be efficiently supported in parallel. They can be used to implement 
counters (as in

http://git-wip-us.apache.org/repos/asf/spark/blob/942c0577/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index ed6b28c..3b071c7 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1415,6 +1415,171 @@ Note that the connections in the pool should be lazily 
created on demand and tim
 
 ***
 
+## Accumulators and Broadcast Variables
+
+[Accumulators](programming-guide.html#accumulators) and [Broadcast 
variables](programming-guide.html#broadcast-variables) cannot be recovered from 
checkpoint in Spark Streaming. If you enable checkpointing and use 
[Accumulators](programming-guide.html#accumulators) or [Broadcast 
variables](programming-guide.html#broadcast-variables) as well, you'll have to 
create lazily instantiated singleton instances for 
[Accumulators](programming-guide.html#accumulators) and [Broadcast 
variables](programming-guide.html#broadcast-variables) so that they can be 
re-instantiated after the driver restarts on failure. This is shown in the 
following example.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+object WordBlacklist {
+
+  @volatile private var instance: Broadcast[Seq[String]] = null
+
+  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          val wordBlacklist = Seq("a", "b", "c")
+          instance = sc.broadcast(wordBlacklist)
+        }
+      }
+    }
+    instance
+  }
+}
+
+object DroppedWordsCounter {
+
+  @volatile private var instance: Accumulator[Long] = null
+
+  def getInstance(sc: SparkContext): Accumulator[Long] = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+        }
+      }
+    }
+    instance
+  }
+}
+
+wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+  // Get or register the blacklist Broadcast
+  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
+  // Get or register the droppedWordsCounter Accumulator
+  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
+  // Use blacklist to drop words and use droppedWordsCounter to count them
+  val counts = rdd.filter { case (word, count) =>
+    if (blacklist.value.contains(word)) {
+      droppedWordsCounter += count
+      false
+    } else {
+      true
+    }
+  }.collect()
+  val output = "Counts at time " + time + " " + counts
+})
+
+{% endhighlight %}
+
+See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class JavaWordBlacklist {
+
+  private static volatile Broadcast<List<String>> instance = null;
+
+  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (JavaWordBlacklist.class) {
+        if (instance == null) {
+          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
+          instance = jsc.broadcast(wordBlacklist);
+        }
+      }
+    }
+    return instance;
+  }
+}
+
+class JavaDroppedWordsCounter {
+
+  private static volatile Accumulator<Integer> instance = null;
+
+  public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (JavaDroppedWordsCounter.class) {
+        if (instance == null) {
+          instance = jsc.accumulator(0, "WordsInBlacklistCounter");
+        }
+      }
+    }
+    return instance;
+  }
+}
+
+wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, 
Void>() {
+  @Override
+  public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws 
IOException {
+    // Get or register the blacklist Broadcast
+    final Broadcast<List<String>> blacklist = 
JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+    // Get or register the droppedWordsCounter Accumulator
+    final Accumulator<Integer> droppedWordsCounter = 
JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+    // Use blacklist to drop words and use droppedWordsCounter to count them
+    String counts = rdd.filter(new Function<Tuple2<String, Integer>, 
Boolean>() {
+      @Override
+      public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
+        if (blacklist.value().contains(wordCount._1())) {
+          droppedWordsCounter.add(wordCount._2());
+          return false;
+        } else {
+          return true;
+        }
+      }
+    }).collect().toString();
+    String output = "Counts at time " + time + " " + counts;
+  }
+}
+
+{% endhighlight %}
+
+See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+def getWordBlacklist(sparkContext):
+    if ('wordBlacklist' not in globals()):
+        globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
+    return globals()['wordBlacklist']
+
+def getDroppedWordsCounter(sparkContext):
+    if ('droppedWordsCounter' not in globals()):
+        globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
+    return globals()['droppedWordsCounter']
+
+def echo(time, rdd):
+    # Get or register the blacklist Broadcast
+    blacklist = getWordBlacklist(rdd.context)
+    # Get or register the droppedWordsCounter Accumulator
+    droppedWordsCounter = getDroppedWordsCounter(rdd.context)
+
+    # Use blacklist to drop words and use droppedWordsCounter to count them
+    def filterFunc(wordCount):
+        if wordCount[0] in blacklist.value:
+            droppedWordsCounter.add(wordCount[1])
+            False
+        else:
+            True
+
+    counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
+
+wordCounts.foreachRDD(echo)
+
+{% endhighlight %}
+
+See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
+
+</div>
+</div>
+
+***
+
 ## DataFrame and SQL Operations
 You can easily use [DataFrames and SQL](sql-programming-guide.html) operations 
on streaming data. You have to create a SQLContext using the SparkContext that 
the StreamingContext is using. Furthermore this has to done such that it can be 
restarted on driver failures. This is done by creating a lazily instantiated 
singleton instance of SQLContext. This is shown in the following example. It 
modifies the earlier [word count example](#a-quick-example) to generate word 
counts using DataFrames and SQL. Each RDD is converted to a DataFrame, 
registered as a temporary table and then queried using SQL.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/942c0577/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index bceda97..90d4737 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -21,17 +21,22 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.List;
 import java.util.regex.Pattern;
 
 import scala.Tuple2;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
+import org.apache.spark.Accumulator;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.Time;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -41,7 +46,48 @@ import 
org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
 
 /**
- * Counts words in text encoded with UTF8 received from the network every 
second.
+ * Use this singleton to get or register a Broadcast variable.
+ */
+class JavaWordBlacklist {
+
+  private static volatile Broadcast<List<String>> instance = null;
+
+  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (JavaWordBlacklist.class) {
+        if (instance == null) {
+          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
+          instance = jsc.broadcast(wordBlacklist);
+        }
+      }
+    }
+    return instance;
+  }
+}
+
+/**
+ * Use this singleton to get or register an Accumulator.
+ */
+class JavaDroppedWordsCounter {
+
+  private static volatile Accumulator<Integer> instance = null;
+
+  public static Accumulator<Integer> getInstance(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (JavaDroppedWordsCounter.class) {
+        if (instance == null) {
+          instance = jsc.accumulator(0, "WordsInBlacklistCounter");
+        }
+      }
+    }
+    return instance;
+  }
+}
+
+/**
+ * Counts words in text encoded with UTF8 received from the network every 
second. This example also
+ * shows how to use lazily instantiated singleton instances for Accumulator 
and Broadcast so that
+ * they can be registered on driver failures.
  *
  * Usage: JavaRecoverableNetworkWordCount <hostname> <port> 
<checkpoint-directory> <output-file>
  *   <hostname> and <port> describe the TCP server that Spark Streaming would 
connect to receive
@@ -111,10 +157,27 @@ public final class JavaRecoverableNetworkWordCount {
     wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, 
Void>() {
       @Override
       public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws 
IOException {
-        String counts = "Counts at time " + time + " " + rdd.collect();
-        System.out.println(counts);
+        // Get or register the blacklist Broadcast
+        final Broadcast<List<String>> blacklist = 
JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+        // Get or register the droppedWordsCounter Accumulator
+        final Accumulator<Integer> droppedWordsCounter = 
JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+        // Use blacklist to drop words and use droppedWordsCounter to count 
them
+        String counts = rdd.filter(new Function<Tuple2<String, Integer>, 
Boolean>() {
+          @Override
+          public Boolean call(Tuple2<String, Integer> wordCount) throws 
Exception {
+            if (blacklist.value().contains(wordCount._1())) {
+              droppedWordsCounter.add(wordCount._2());
+              return false;
+            } else {
+              return true;
+            }
+          }
+        }).collect().toString();
+        String output = "Counts at time " + time + " " + counts;
+        System.out.println(output);
+        System.out.println("Dropped " + droppedWordsCounter.value() + " 
word(s) totally");
         System.out.println("Appending to " + outputFile.getAbsolutePath());
-        Files.append(counts + "\n", outputFile, Charset.defaultCharset());
+        Files.append(output + "\n", outputFile, Charset.defaultCharset());
         return null;
       }
     });

http://git-wip-us.apache.org/repos/asf/spark/blob/942c0577/examples/src/main/python/streaming/recoverable_network_wordcount.py
----------------------------------------------------------------------
diff --git 
a/examples/src/main/python/streaming/recoverable_network_wordcount.py 
b/examples/src/main/python/streaming/recoverable_network_wordcount.py
index ac91f0a..52b2639 100644
--- a/examples/src/main/python/streaming/recoverable_network_wordcount.py
+++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py
@@ -44,6 +44,20 @@ from pyspark import SparkContext
 from pyspark.streaming import StreamingContext
 
 
+# Get or register a Broadcast variable
+def getWordBlacklist(sparkContext):
+    if ('wordBlacklist' not in globals()):
+        globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
+    return globals()['wordBlacklist']
+
+
+# Get or register an Accumulator
+def getDroppedWordsCounter(sparkContext):
+    if ('droppedWordsCounter' not in globals()):
+        globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
+    return globals()['droppedWordsCounter']
+
+
 def createContext(host, port, outputPath):
     # If you do not see this printed, that means the StreamingContext has been 
loaded
     # from the new checkpoint
@@ -60,8 +74,22 @@ def createContext(host, port, outputPath):
     wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
 
     def echo(time, rdd):
-        counts = "Counts at time %s %s" % (time, rdd.collect())
+        # Get or register the blacklist Broadcast
+        blacklist = getWordBlacklist(rdd.context)
+        # Get or register the droppedWordsCounter Accumulator
+        droppedWordsCounter = getDroppedWordsCounter(rdd.context)
+
+        # Use blacklist to drop words and use droppedWordsCounter to count them
+        def filterFunc(wordCount):
+            if wordCount[0] in blacklist.value:
+                droppedWordsCounter.add(wordCount[1])
+                False
+            else:
+                True
+
+        counts = "Counts at time %s %s" % (time, 
rdd.filter(filterFunc).collect())
         print(counts)
+        print("Dropped %d word(s) totally" % droppedWordsCounter.value)
         print("Appending to " + os.path.abspath(outputPath))
         with open(outputPath, 'a') as f:
             f.write(counts + "\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/942c0577/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 9916882..38d4fd1 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -23,13 +23,55 @@ import java.nio.charset.Charset
 
 import com.google.common.io.Files
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{Accumulator, SparkConf, SparkContext}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
 import org.apache.spark.util.IntParam
 
 /**
- * Counts words in text encoded with UTF8 received from the network every 
second.
+ * Use this singleton to get or register a Broadcast variable.
+ */
+object WordBlacklist {
+
+  @volatile private var instance: Broadcast[Seq[String]] = null
+
+  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          val wordBlacklist = Seq("a", "b", "c")
+          instance = sc.broadcast(wordBlacklist)
+        }
+      }
+    }
+    instance
+  }
+}
+
+/**
+ * Use this singleton to get or register an Accumulator.
+ */
+object DroppedWordsCounter {
+
+  @volatile private var instance: Accumulator[Long] = null
+
+  def getInstance(sc: SparkContext): Accumulator[Long] = {
+    if (instance == null) {
+      synchronized {
+        if (instance == null) {
+          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
+        }
+      }
+    }
+    instance
+  }
+}
+
+/**
+ * Counts words in text encoded with UTF8 received from the network every 
second. This example also
+ * shows how to use lazily instantiated singleton instances for Accumulator 
and Broadcast so that
+ * they can be registered on driver failures.
  *
  * Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> 
<output-file>
  *   <hostname> and <port> describe the TCP server that Spark Streaming would 
connect to receive
@@ -75,10 +117,24 @@ object RecoverableNetworkWordCount {
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
     wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
-      val counts = "Counts at time " + time + " " + 
rdd.collect().mkString("[", ", ", "]")
-      println(counts)
+      // Get or register the blacklist Broadcast
+      val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
+      // Get or register the droppedWordsCounter Accumulator
+      val droppedWordsCounter = 
DroppedWordsCounter.getInstance(rdd.sparkContext)
+      // Use blacklist to drop words and use droppedWordsCounter to count them
+      val counts = rdd.filter { case (word, count) =>
+        if (blacklist.value.contains(word)) {
+          droppedWordsCounter += count
+          false
+        } else {
+          true
+        }
+      }.collect().mkString("[", ", ", "]")
+      val output = "Counts at time " + time + " " + counts
+      println(output)
+      println("Dropped " + droppedWordsCounter.value + " word(s) totally")
       println("Appending to " + outputFile.getAbsolutePath)
-      Files.append(counts + "\n", outputFile, Charset.defaultCharset())
+      Files.append(output + "\n", outputFile, Charset.defaultCharset())
     })
     ssc
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to