Has anyone tried using functools.partial (
https://docs.python.org/2/library/functools.html#functools.partial) with
PySpark?  If it works, it might be a nice way to address this use-case.


On Sun, Aug 17, 2014 at 7:35 PM, Davies Liu <dav...@databricks.com> wrote:

> On Sun, Aug 17, 2014 at 11:21 AM, Chengi Liu <chengi.liu...@gmail.com>
> wrote:
> > Hi,
> >   Thanks for the response..
> > In the second case f2??
> > foo will have to be declared globablly??right??
> >
> > My function is somthing like:
> > def indexing(splitIndex, iterator):
> >   count = 0
> >   offset = sum(offset_lists[:splitIndex]) if splitIndex else 0
> >   indexed = []
> >   for i, e in enumerate(iterator):
> >     index = count + offset + i
> >     for j, ele in enumerate(e):
> >       indexed.append((index, j, ele))
> >   yield indexed
>
> In this function `indexing`, `offset_lists` should be global.
>
> > def another_funct(offset_lists):
> >     #get that damn offset_lists
> >     rdd.mapPartitionsWithSplit(indexing)
> > But then, the issue is that offset_lists?
> > Any suggestions?
>
> Basically, you can do what you do in normal Python program, PySpark
> will send the global variables or closures to worker processes
> automatically.
>
> So, you can :
>
> def indexing(splitIndex, iterator, offset_lists):
>      pass
>
> def another_func(offset_lists):
>      rdd.mapPartitionsWithSplit(lambda index, it: indexing(index, it,
> offset_lists))
>
> Or:
>
> def indexing(splitIndex, iterrator):
>      # access offset_lists
>
> def another_func(offset):
>      global offset_lists
>      offset_lists = offset
>      rdd. mapPartitionsWithSplit(indexing)
>
> Or:
>
> def another_func(offset_lists):
>       def indexing(index, iterator):
>           # access offset_lists
>           pass
>       rdd.mapPartitionsWithIndex(indexing)
>
>
> >
> > On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu <dav...@databricks.com>
> wrote:
> >>
> >> The callback function f only accept 2 arguments, if you want to pass
> >> another objects to it, you need closure, such as:
> >>
> >> foo=xxx
> >> def f(index, iterator, foo):
> >>      yield (index, foo)
> >> rdd.mapPartitionsWithIndex(lambda index, it: f(index, it, foo))
> >>
> >> also you can make f become `closure`:
> >>
> >> def f2(index, iterator):
> >>     yield (index, foo)
> >> rdd.mapPartitionsWithIndex(f2)
> >>
> >> On Sun, Aug 17, 2014 at 10:25 AM, Chengi Liu <chengi.liu...@gmail.com>
> >> wrote:
> >> > Hi,
> >> >   In this example:
> >> >
> >> >
> http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#mapPartitionsWithSplit
> >> > Let say, f takes three arguments:
> >> > def f(splitIndex, iterator, foo): yield splitIndex
> >> > Now, how do i send this foo parameter to this method?
> >> > rdd.mapPartitionsWithSplit(f)
> >> > Thanks
> >> >
> >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to