Building on what Davies Liu said,
How about something like:
def indexing(splitIndex, iterator,*offset_lists* ):
count = 0
offset = sum(*offset_lists*[:splitIndex]) if splitIndex else 0
indexed = []
for i, e in enumerate(iterator):
index = count + offset + i
for j, ele in enumerate(e):
indexed.append((index, j, ele))
yield indexed
def another_funct(offset_lists):
*#get that damn offset_lists*
#rdd.mapPartitionsWithSplit(indexing)
*rdd.mapPartitionsWithSplit(lambda index, it: indexing(index, it,
offset_lists))*
Ps. Probably in indexing function, count variable is not really being
effective?
On Sun, Aug 17, 2014 at 11:21 AM, Chengi Liu <[email protected]>
wrote:
> Hi,
> Thanks for the response..
> In the second case f2??
> foo will have to be declared globablly??right??
>
> My function is somthing like:
> def indexing(splitIndex, iterator):
> count = 0
> offset = sum(*offset_lists*[:splitIndex]) if splitIndex else 0
> indexed = []
> for i, e in enumerate(iterator):
> index = count + offset + i
> for j, ele in enumerate(e):
> indexed.append((index, j, ele))
> yield indexed
>
> def another_funct(offset_lists):
> *#get that damn offset_lists*
> rdd.mapPartitionsWithSplit(indexing)
> But then, the issue is that offset_lists?
> Any suggestions?
>
>
> On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu <[email protected]>
> wrote:
>
>> The callback function f only accept 2 arguments, if you want to pass
>> another objects to it, you need closure, such as:
>>
>> foo=xxx
>> def f(index, iterator, foo):
>> yield (index, foo)
>> rdd.mapPartitionsWithIndex(lambda index, it: f(index, it, foo))
>>
>> also you can make f become `closure`:
>>
>> def f2(index, iterator):
>> yield (index, foo)
>> rdd.mapPartitionsWithIndex(f2)
>>
>> On Sun, Aug 17, 2014 at 10:25 AM, Chengi Liu <[email protected]>
>> wrote:
>> > Hi,
>> > In this example:
>> >
>> http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#mapPartitionsWithSplit
>> > Let say, f takes three arguments:
>> > def f(splitIndex, iterator, foo): yield splitIndex
>> > Now, how do i send this foo parameter to this method?
>> > rdd.mapPartitionsWithSplit(f)
>> > Thanks
>> >
>>
>
>
--
Mohit
"When you want success as badly as you want the air, then you will get it.
There is no other secret of success."
-Socrates