Re: Fatal error when using broadcast variables and checkpointing in Spark Streaming

2016-07-22 Thread Joe Panciera
I realized that there's an error in the code. Corrected:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream



sc = SparkContext(appName="FileAutomation")

# Create streaming context from existing spark context
ssc = StreamingContext(sc, 10)
alert_stream = KinesisUtils.createStream(ssc,
 "Events",  # App Name
 "Event_Test",  # Stream Name

"https://kinesis.us-west-2.amazonaws.com";,
 "us-west-2",
 InitialPositionInStream.LATEST,
 1
 )

events = sc.broadcast(25)


def test(rdd):

global events
num = events.value
print num

events.unpersist()
events = sc.broadcast(num + 1)


alert_stream.foreachRDD(test)

# Comment this line and no error occurs
ssc.checkpoint('dir')
ssc.start()
ssc.awaitTermination()


On Fri, Jul 22, 2016 at 1:50 PM, Joe Panciera 
wrote:

> Hi,
>
> I'm attempting to use broadcast variables to update stateful values used
> across the cluster for processing. Essentially, I have a function that is
> executed in .foreachRDD that updates the broadcast variable by calling
> unpersist() and then rebroadcasting. This works without issues when I
> execute the code without checkpointing, but as soon as I include
> checkpointing it seems to be unable to pickle the function. I get this
> error:
>
> *It appears that you are attempting to reference SparkContext from a
> broadcast *
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line
> 268, in __getnewargs__
> "It appears that you are attempting to reference SparkContext from a
> broadcast "
> Exception: It appears that you are attempting to reference SparkContext
> from a broadcast variable, action, or transformation. SparkContext can only
> be used on the driver, not in code that it run on workers. For more
> information, see SPARK-5063.
>
> at
> org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144)
> at
> org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
> ... 61 more
>
>
> Here's some simple code that shows this occurring:
>
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
>
>
>
> sc = SparkContext(appName="FileAutomation")
>
> # Create streaming context from existing spark context
> ssc = StreamingContext(sc, 10)
> alert_stream = KinesisUtils.createStream(ssc,
>  "Events",  # App Name
>  "Event_Test",  # Stream Name
>  
> "https://kinesis.us-west-2.amazonaws.com";,
>  "us-west-2",
>  InitialPositionInStream.LATEST,
>  1
>  )
>
> events = sc.broadcast(25)
>
>
> def test(rdd):
>
> global events
> num = events.value
> print num
>
> events.unpersist()
> events = sc.broadcast(num + 1)
>
>
> events.foreachRDD(test)
>
> # Comment this line and no error occurs
> ssc.checkpoint('dir')
> ssc.start()
> ssc.awaitTermination()
>
>


Fatal error when using broadcast variables and checkpointing in Spark Streaming

2016-07-22 Thread Joe Panciera
Hi,

I'm attempting to use broadcast variables to update stateful values used
across the cluster for processing. Essentially, I have a function that is
executed in .foreachRDD that updates the broadcast variable by calling
unpersist() and then rebroadcasting. This works without issues when I
execute the code without checkpointing, but as soon as I include
checkpointing it seems to be unable to pickle the function. I get this
error:

*It appears that you are attempting to reference SparkContext from a
broadcast *

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line
268, in __getnewargs__
"It appears that you are attempting to reference SparkContext from a
broadcast "
Exception: It appears that you are attempting to reference SparkContext
from a broadcast variable, action, or transformation. SparkContext can only
be used on the driver, not in code that it run on workers. For more
information, see SPARK-5063.

at
org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144)
at
org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
... 61 more


Here's some simple code that shows this occurring:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream



sc = SparkContext(appName="FileAutomation")

# Create streaming context from existing spark context
ssc = StreamingContext(sc, 10)
alert_stream = KinesisUtils.createStream(ssc,
 "Events",  # App Name
 "Event_Test",  # Stream Name

"https://kinesis.us-west-2.amazonaws.com";,
 "us-west-2",
 InitialPositionInStream.LATEST,
 1
 )

events = sc.broadcast(25)


def test(rdd):

global events
num = events.value
print num

events.unpersist()
events = sc.broadcast(num + 1)


events.foreachRDD(test)

# Comment this line and no error occurs
ssc.checkpoint('dir')
ssc.start()
ssc.awaitTermination()