Has anyone tried using functools.partial ( https://docs.python.org/2/library/functools.html#functools.partial) with PySpark? If it works, it might be a nice way to address this use-case.
On Sun, Aug 17, 2014 at 7:35 PM, Davies Liu <dav...@databricks.com> wrote: > On Sun, Aug 17, 2014 at 11:21 AM, Chengi Liu <chengi.liu...@gmail.com> > wrote: > > Hi, > > Thanks for the response.. > > In the second case f2?? > > foo will have to be declared globablly??right?? > > > > My function is somthing like: > > def indexing(splitIndex, iterator): > > count = 0 > > offset = sum(offset_lists[:splitIndex]) if splitIndex else 0 > > indexed = [] > > for i, e in enumerate(iterator): > > index = count + offset + i > > for j, ele in enumerate(e): > > indexed.append((index, j, ele)) > > yield indexed > > In this function `indexing`, `offset_lists` should be global. > > > def another_funct(offset_lists): > > #get that damn offset_lists > > rdd.mapPartitionsWithSplit(indexing) > > But then, the issue is that offset_lists? > > Any suggestions? > > Basically, you can do what you do in normal Python program, PySpark > will send the global variables or closures to worker processes > automatically. > > So, you can : > > def indexing(splitIndex, iterator, offset_lists): > pass > > def another_func(offset_lists): > rdd.mapPartitionsWithSplit(lambda index, it: indexing(index, it, > offset_lists)) > > Or: > > def indexing(splitIndex, iterrator): > # access offset_lists > > def another_func(offset): > global offset_lists > offset_lists = offset > rdd. mapPartitionsWithSplit(indexing) > > Or: > > def another_func(offset_lists): > def indexing(index, iterator): > # access offset_lists > pass > rdd.mapPartitionsWithIndex(indexing) > > > > > > On Sun, Aug 17, 2014 at 11:15 AM, Davies Liu <dav...@databricks.com> > wrote: > >> > >> The callback function f only accept 2 arguments, if you want to pass > >> another objects to it, you need closure, such as: > >> > >> foo=xxx > >> def f(index, iterator, foo): > >> yield (index, foo) > >> rdd.mapPartitionsWithIndex(lambda index, it: f(index, it, foo)) > >> > >> also you can make f become `closure`: > >> > >> def f2(index, iterator): > >> yield (index, foo) > >> rdd.mapPartitionsWithIndex(f2) > >> > >> On Sun, Aug 17, 2014 at 10:25 AM, Chengi Liu <chengi.liu...@gmail.com> > >> wrote: > >> > Hi, > >> > In this example: > >> > > >> > > http://www.cs.berkeley.edu/~pwendell/strataconf/api/pyspark/pyspark.rdd.RDD-class.html#mapPartitionsWithSplit > >> > Let say, f takes three arguments: > >> > def f(splitIndex, iterator, foo): yield splitIndex > >> > Now, how do i send this foo parameter to this method? > >> > rdd.mapPartitionsWithSplit(f) > >> > Thanks > >> > > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >