Hi,

I'm attempting to use broadcast variables to update stateful values used
across the cluster for processing. Essentially, I have a function that is
executed in .foreachRDD that updates the broadcast variable by calling
unpersist() and then rebroadcasting. This works without issues when I
execute the code without checkpointing, but as soon as I include
checkpointing it seems to be unable to pickle the function. I get this
error:

*It appears that you are attempting to reference SparkContext from a
broadcast *

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line
268, in __getnewargs__
    "It appears that you are attempting to reference SparkContext from a
broadcast "
Exception: It appears that you are attempting to reference SparkContext
from a broadcast variable, action, or transformation. SparkContext can only
be used on the driver, not in code that it run on workers. For more
information, see SPARK-5063.

        at
org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144)
        at
org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
        ... 61 more


Here's some simple code that shows this occurring:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream



sc = SparkContext(appName="FileAutomation")

# Create streaming context from existing spark context
ssc = StreamingContext(sc, 10)
alert_stream = KinesisUtils.createStream(ssc,
                                         "Events",  # App Name
                                         "Event_Test",  # Stream Name

"https://kinesis.us-west-2.amazonaws.com";,
                                         "us-west-2",
                                         InitialPositionInStream.LATEST,
                                         10000
                                         )

events = sc.broadcast(25)


def test(rdd):

    global events
    num = events.value
    print num

    events.unpersist()
    events = sc.broadcast(num + 1)


events.foreachRDD(test)

# Comment this line and no error occurs
ssc.checkpoint('dir')
ssc.start()
ssc.awaitTermination()

Reply via email to