I realized that there's an error in the code. Corrected: from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
sc = SparkContext(appName="FileAutomation") # Create streaming context from existing spark context ssc = StreamingContext(sc, 10) alert_stream = KinesisUtils.createStream(ssc, "Events", # App Name "Event_Test", # Stream Name "https://kinesis.us-west-2.amazonaws.com", "us-west-2", InitialPositionInStream.LATEST, 10000 ) events = sc.broadcast(25) def test(rdd): global events num = events.value print num events.unpersist() events = sc.broadcast(num + 1) alert_stream.foreachRDD(test) # Comment this line and no error occurs ssc.checkpoint('dir') ssc.start() ssc.awaitTermination() On Fri, Jul 22, 2016 at 1:50 PM, Joe Panciera <joe.panci...@gmail.com> wrote: > Hi, > > I'm attempting to use broadcast variables to update stateful values used > across the cluster for processing. Essentially, I have a function that is > executed in .foreachRDD that updates the broadcast variable by calling > unpersist() and then rebroadcasting. This works without issues when I > execute the code without checkpointing, but as soon as I include > checkpointing it seems to be unable to pickle the function. I get this > error: > > *It appears that you are attempting to reference SparkContext from a > broadcast * > > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line > 268, in __getnewargs__ > "It appears that you are attempting to reference SparkContext from a > broadcast " > Exception: It appears that you are attempting to reference SparkContext > from a broadcast variable, action, or transformation. SparkContext can only > be used on the driver, not in code that it run on workers. For more > information, see SPARK-5063. > > at > org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144) > at > org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) > ... 61 more > > > Here's some simple code that shows this occurring: > > from pyspark import SparkContext > from pyspark.streaming import StreamingContext > from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream > > > > sc = SparkContext(appName="FileAutomation") > > # Create streaming context from existing spark context > ssc = StreamingContext(sc, 10) > alert_stream = KinesisUtils.createStream(ssc, > "Events", # App Name > "Event_Test", # Stream Name > > "https://kinesis.us-west-2.amazonaws.com", > "us-west-2", > InitialPositionInStream.LATEST, > 10000 > ) > > events = sc.broadcast(25) > > > def test(rdd): > > global events > num = events.value > print num > > events.unpersist() > events = sc.broadcast(num + 1) > > > events.foreachRDD(test) > > # Comment this line and no error occurs > ssc.checkpoint('dir') > ssc.start() > ssc.awaitTermination() > >