I realized that there's an error in the code. Corrected:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
sc = SparkContext(appName="FileAutomation")
# Create streaming context from existing spark context
ssc = StreamingContext(sc, 10)
alert_stream = KinesisUtils.createStream(ssc,
"Events", # App Name
"Event_Test", # Stream Name
"https://kinesis.us-west-2.amazonaws.com";,
"us-west-2",
InitialPositionInStream.LATEST,
1
)
events = sc.broadcast(25)
def test(rdd):
global events
num = events.value
print num
events.unpersist()
events = sc.broadcast(num + 1)
alert_stream.foreachRDD(test)
# Comment this line and no error occurs
ssc.checkpoint('dir')
ssc.start()
ssc.awaitTermination()
On Fri, Jul 22, 2016 at 1:50 PM, Joe Panciera
wrote:
> Hi,
>
> I'm attempting to use broadcast variables to update stateful values used
> across the cluster for processing. Essentially, I have a function that is
> executed in .foreachRDD that updates the broadcast variable by calling
> unpersist() and then rebroadcasting. This works without issues when I
> execute the code without checkpointing, but as soon as I include
> checkpointing it seems to be unable to pickle the function. I get this
> error:
>
> *It appears that you are attempting to reference SparkContext from a
> broadcast *
>
> File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line
> 268, in __getnewargs__
> "It appears that you are attempting to reference SparkContext from a
> broadcast "
> Exception: It appears that you are attempting to reference SparkContext
> from a broadcast variable, action, or transformation. SparkContext can only
> be used on the driver, not in code that it run on workers. For more
> information, see SPARK-5063.
>
> at
> org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144)
> at
> org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
> ... 61 more
>
>
> Here's some simple code that shows this occurring:
>
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
>
>
>
> sc = SparkContext(appName="FileAutomation")
>
> # Create streaming context from existing spark context
> ssc = StreamingContext(sc, 10)
> alert_stream = KinesisUtils.createStream(ssc,
> "Events", # App Name
> "Event_Test", # Stream Name
>
> "https://kinesis.us-west-2.amazonaws.com";,
> "us-west-2",
> InitialPositionInStream.LATEST,
> 1
> )
>
> events = sc.broadcast(25)
>
>
> def test(rdd):
>
> global events
> num = events.value
> print num
>
> events.unpersist()
> events = sc.broadcast(num + 1)
>
>
> events.foreachRDD(test)
>
> # Comment this line and no error occurs
> ssc.checkpoint('dir')
> ssc.start()
> ssc.awaitTermination()
>
>