Hi, I'm attempting to use broadcast variables to update stateful values used across the cluster for processing. Essentially, I have a function that is executed in .foreachRDD that updates the broadcast variable by calling unpersist() and then rebroadcasting. This works without issues when I execute the code without checkpointing, but as soon as I include checkpointing it seems to be unable to pickle the function. I get this error:
*It appears that you are attempting to reference SparkContext from a broadcast * File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 268, in __getnewargs__ "It appears that you are attempting to reference SparkContext from a broadcast " Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. at org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144) at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) ... 61 more Here's some simple code that shows this occurring: from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream sc = SparkContext(appName="FileAutomation") # Create streaming context from existing spark context ssc = StreamingContext(sc, 10) alert_stream = KinesisUtils.createStream(ssc, "Events", # App Name "Event_Test", # Stream Name "https://kinesis.us-west-2.amazonaws.com", "us-west-2", InitialPositionInStream.LATEST, 10000 ) events = sc.broadcast(25) def test(rdd): global events num = events.value print num events.unpersist() events = sc.broadcast(num + 1) events.foreachRDD(test) # Comment this line and no error occurs ssc.checkpoint('dir') ssc.start() ssc.awaitTermination()