Here it is. How do I access a broadcastVar in a function that's in another
module (process_stuff.py below):

Thanks,
Vadim

main.py
-------

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from process_stuff import myfunc
from metadata import get_metadata

conf = SparkConf().setAppName('My App').setMaster('local[4]')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 30)
sqlContext = SQLContext(sc)

distFile = ssc.textFileStream("s3n://...")

distFile.foreachRDD(process)

mylist = get_metadata()

print 'BROADCASTING...'
broadcastVar = sc.broadcast(mylist)
print broadcastVar
print broadcastVar.value
print 'FINISHED BROADCASTING...'

## mylist and broadcastVar, broadcastVar.value print fine

def getSqlContextInstance(sparkContext):

        if ('sqlContextSingletonInstance' not in globals()):
            globals()['sqlContextSingletonInstance'] =
SQLContext(sparkContext)
        return globals()['sqlContextSingletonInstance']

def process(rdd):

sqlContext = getSqlContextInstance(rdd.context)

if rdd.take(1):

jsondf = sqlContext.jsonRDD(rdd)

#jsondf.printSchema()

jsondf.registerTempTable('mytable')

stuff = sqlContext.sql("SELECT ...")
stuff_mapped = stuff.map(myfunc)  ###### I want myfunc to see mylist from
above?????

...

process_stuff.py
----------------------

def myfunc(x):

metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO
FIX?

...


metadata.py
----------------

def get_metadata():

...

return mylist
ᐧ

On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das <t...@databricks.com> wrote:

> Can you give full code? especially the myfunc?
>
> On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy <
> vadim.bichuts...@gmail.com> wrote:
>
>> Here's what I did:
>>
>> print 'BROADCASTING...'
>> broadcastVar = sc.broadcast(mylist)
>> print broadcastVar
>> print broadcastVar.value
>> print 'FINISHED BROADCASTING...'
>>
>> The above works fine,
>>
>> but when I call myrdd.map(myfunc) I get *NameError: global name
>> 'broadcastVar' is not defined*
>>
>> The myfunc function is in a different module. How do I make it aware of
>> broadcastVar?
>> ᐧ
>>
>> On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy <
>> vadim.bichuts...@gmail.com> wrote:
>>
>>> Great. Will try to modify the code. Always room to optimize!
>>> ᐧ
>>>
>>> On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Absolutely. The same code would work for local as well as distributed
>>>> mode!
>>>>
>>>> On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy <
>>>> vadim.bichuts...@gmail.com> wrote:
>>>>
>>>>> Can I use broadcast vars in local mode?
>>>>> ᐧ
>>>>>
>>>>> On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das <t...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Yep. Not efficient. Pretty bad actually. That's why broadcast
>>>>>> variable were introduced right at the very beginning of Spark.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy <
>>>>>> vadim.bichuts...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks TD. I was looking into broadcast variables.
>>>>>>>
>>>>>>> Right now I am running it locally...and I plan to move it to
>>>>>>> "production" on EC2.
>>>>>>>
>>>>>>> The way I fixed it is by doing myrdd.map(lambda x: (x,
>>>>>>> mylist)).map(myfunc) but I don't think it's efficient?
>>>>>>>
>>>>>>> mylist is filled only once at the start and never changes.
>>>>>>>
>>>>>>> Vadim
>>>>>>> ᐧ
>>>>>>>
>>>>>>> On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das <t...@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Is the mylist present on every executor? If not, then you have to
>>>>>>>> pass it on. And broadcasts are the best way to pass them on. But note 
>>>>>>>> that
>>>>>>>> once broadcasted it will immutable at the executors, and if you update 
>>>>>>>> the
>>>>>>>> list at the driver, you will have to broadcast it again.
>>>>>>>>
>>>>>>>> TD
>>>>>>>>
>>>>>>>> On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy <
>>>>>>>> vadim.bichuts...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I am using Spark Streaming with Python. For each RDD, I call a
>>>>>>>>> map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. 
>>>>>>>>> In yet
>>>>>>>>> another separate Python module I have a global list, i.e. mylist,
>>>>>>>>> that's populated with metadata. I can't get myfunc to see 
>>>>>>>>> mylist...it's
>>>>>>>>> always empty. Alternatively, I guess I could pass mylist to map.
>>>>>>>>>
>>>>>>>>> Any suggestions?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Vadim
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to