[ 
https://issues.apache.org/jira/browse/SPARK-15611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Lau updated SPARK-15611:
-------------------------------
    Description: 
hi, i'm writing some code as below:

{code:python}
from random import random
from operator import add
def funcx( x ):
  print x[0],x[1]
  return 1 if x[0]** 2 + x[1]** 2 < 1 else 0
def genRnd(ind):
  x=random() * 2 - 1
  y=random() * 2 - 1
  return (x,y)
def runsp(total):
  ret=sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(lambda x, 
y: x + y)/float(total) * 4
  print ret
runsp(3)
{code}

once started the pyspark shell, no matter how many times i run "runsp(N)" , 
this code always get a same sequece of random numbers, like this

```sh
0.896083541418 -0.635625854075
-0.0423532645466 -0.526910255885
0.498518696049 -0.872983895832
1.3333333333333333
>>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total)
>>>  * 4
0.896083541418 -0.635625854075
-0.0423532645466 -0.526910255885
0.498518696049 -0.872983895832
1.3333333333333333
>>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total)
>>>  * 4
0.896083541418 -0.635625854075
-0.0423532645466 -0.526910255885
0.498518696049 -0.872983895832
1.3333333333333333
```
i think this is because when we import pyspark.worker in the daemon.py, we alse 
import a random by the shuffle.py which is imported by pyspark.worker, this 
worker, forked by "pid = os.fork()", also remains the state of the parent's 
random, thus every forked worker get the same random.next().

we need to re-random the random by random.seed, which will solve the problem, 
but i think this PR. may not be the proper fix.
ths.

  was:
hi, i'm writing some code as below:

{quote}
from random import random
from operator import add
def funcx( x ):
  print x[0],x[1]
  return 1 if x[0]** 2 + x[1]** 2 < 1 else 0
def genRnd(ind):
  x=random() * 2 - 1
  y=random() * 2 - 1
  return (x,y)
def runsp(total):
  ret=sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(lambda x, 
y: x + y)/float(total) * 4
  print ret
runsp(3)
{quote}

once started the pyspark shell, no matter how many times i run "runsp(N)" , 
this code always get a same sequece of random numbers, like this

```sh
0.896083541418 -0.635625854075
-0.0423532645466 -0.526910255885
0.498518696049 -0.872983895832
1.3333333333333333
>>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total)
>>>  * 4
0.896083541418 -0.635625854075
-0.0423532645466 -0.526910255885
0.498518696049 -0.872983895832
1.3333333333333333
>>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total)
>>>  * 4
0.896083541418 -0.635625854075
-0.0423532645466 -0.526910255885
0.498518696049 -0.872983895832
1.3333333333333333
```
i think this is because when we import pyspark.worker in the daemon.py, we alse 
import a random by the shuffle.py which is imported by pyspark.worker, this 
worker, forked by "pid = os.fork()", also remains the state of the parent's 
random, thus every forked worker get the same random.next().

we need to re-random the random by random.seed, which will solve the problem, 
but i think this PR. may not be the proper fix.
ths.


> Each forked worker  in daemon.py keep the parent's random state
> ---------------------------------------------------------------
>
>                 Key: SPARK-15611
>                 URL: https://issues.apache.org/jira/browse/SPARK-15611
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.6.1
>            Reporter: Thomas Lau
>            Priority: Minor
>
> hi, i'm writing some code as below:
> {code:python}
> from random import random
> from operator import add
> def funcx( x ):
>   print x[0],x[1]
>   return 1 if x[0]** 2 + x[1]** 2 < 1 else 0
> def genRnd(ind):
>   x=random() * 2 - 1
>   y=random() * 2 - 1
>   return (x,y)
> def runsp(total):
>   ret=sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(lambda x, 
> y: x + y)/float(total) * 4
>   print ret
> runsp(3)
> {code}
> once started the pyspark shell, no matter how many times i run "runsp(N)" , 
> this code always get a same sequece of random numbers, like this
> ```sh
> 0.896083541418 -0.635625854075
> -0.0423532645466 -0.526910255885
> 0.498518696049 -0.872983895832
> 1.3333333333333333
> >>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total)
> >>>  * 4
> 0.896083541418 -0.635625854075
> -0.0423532645466 -0.526910255885
> 0.498518696049 -0.872983895832
> 1.3333333333333333
> >>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total)
> >>>  * 4
> 0.896083541418 -0.635625854075
> -0.0423532645466 -0.526910255885
> 0.498518696049 -0.872983895832
> 1.3333333333333333
> ```
> i think this is because when we import pyspark.worker in the daemon.py, we 
> alse import a random by the shuffle.py which is imported by pyspark.worker, 
> this worker, forked by "pid = os.fork()", also remains the state of the 
> parent's random, thus every forked worker get the same random.next().
> we need to re-random the random by random.seed, which will solve the problem, 
> but i think this PR. may not be the proper fix.
> ths.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to