On Sun, Aug 17, 2014 at 11:21 AM, Chengi Liu <[email protected]> wrote:
> Hi,
> Thanks for the response..
> In the second case f2??
> foo will have to be declared globablly??right??
>
> My function is somthing like:
> def indexing(splitIndex, iterator):
> count = 0
> offset = sum(offset_lists[:splitIndex]) if splitIndex else 0
> indexed = []
> for i, e in enumerate(iterator):
> index = count + offset + i
> for j, ele in enumerate(e):
> indexed.append((index, j, ele))
> yield indexed
In this function `indexing`, `offset_lists` should be global.
> def another_funct(offset_lists):
> #get that damn offset_lists
> rdd.mapPartitionsWithSplit(indexing)
> But then, the issue is that offset_lists?
> Any suggestions?
Basically, you can do what you do in normal Python program, PySpark
will send the global variables or closures to worker processes automatically.
So, you can :
def indexing(splitIndex, iterator, offset_lists):
pass
def another_func(offset_lists):
rdd.mapPartitionsWithSplit(lambda index, it: indexing(index, it,
offset_lists))
Or:
def indexing(splitIndex, iterrator):
# access offset_lists
def another_func(offset):
global offset_lists
offset_lists = offset
rdd. mapPartitionsWithSplit(indexing)
Or:
def another_func(offset_lists):
def indexing(index, iterator):
# access offset_lists
pass
rdd.mapPartitionsWithIndex(indexing)
>
> On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu <[email protected]> wrote:
>>
>> The callback function f only accept 2 arguments, if you want to pass
>> another objects to it, you need closure, such as:
>>
>> foo=xxx
>> def f(index, iterator, foo):
>> yield (index, foo)
>> rdd.mapPartitionsWithIndex(lambda index, it: f(index, it, foo))
>>
>> also you can make f become `closure`:
>>
>> def f2(index, iterator):
>> yield (index, foo)
>> rdd.mapPartitionsWithIndex(f2)
>>
>> On Sun, Aug 17, 2014 at 10:25 AM, Chengi Liu <[email protected]>
>> wrote:
>> > Hi,
>> > In this example:
>> >
>> > http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#mapPartitionsWithSplit
>> > Let say, f takes three arguments:
>> > def f(splitIndex, iterator, foo): yield splitIndex
>> > Now, how do i send this foo parameter to this method?
>> > rdd.mapPartitionsWithSplit(f)
>> > Thanks
>> >
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]