[ https://issues.apache.org/jira/browse/SPARK-15611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Thomas Lau updated SPARK-15611: ------------------------------- Description: hi, i'm writing some code as below: ```py from random import random from operator import add def funcx( x ): print x[0],x[1] return 1 if x[0]**2 + x[1]**2 < 1 else 0 def genRnd(ind): x=random() * 2 - 1 y=random() * 2 - 1 return (x,y) def runsp(total): ret=sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(lambda x, y: x + y)/float(total) * 4 print ret runsp(3) ``` once started the pyspark shell, no matter how many times i run "runsp(N)" , this code always get a same sequece of random numbers, like this ```sh 0.896083541418 -0.635625854075 -0.0423532645466 -0.526910255885 0.498518696049 -0.872983895832 1.3333333333333333 >>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total) >>> * 4 0.896083541418 -0.635625854075 -0.0423532645466 -0.526910255885 0.498518696049 -0.872983895832 1.3333333333333333 >>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total) >>> * 4 0.896083541418 -0.635625854075 -0.0423532645466 -0.526910255885 0.498518696049 -0.872983895832 1.3333333333333333 ``` i think this is because when we import pyspark.worker in the daemon.py, we alse import a random by the shuffle.py which is imported by pyspark.worker, this worker, forked by "pid = os.fork()", also remains the state of the parent's random, thus every forked worker get the same random.next(). we need to re-random the random by random.seed, which will solve the problem, but i think this PR. may not be the proper fix. ths. was: hi, i'm writing some code as below: ```py from random import random from operator import add def funcx(x): print x[0],x[1] return 1 if x[0]**2 + x[1]**2 < 1 else 0 def genRnd(ind): x=random() * 2 - 1 y=random() * 2 - 1 return (x,y) def runsp(total): ret=sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(lambda x, y: x + y)/float(total) * 4 print ret runsp(3) ``` once started the pyspark shell, no matter how many times i run "runsp(N)" , this code always get a same sequece of random numbers, like this ```sh 0.896083541418 -0.635625854075 -0.0423532645466 -0.526910255885 0.498518696049 -0.872983895832 1.3333333333333333 >>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total) >>> * 4 0.896083541418 -0.635625854075 -0.0423532645466 -0.526910255885 0.498518696049 -0.872983895832 1.3333333333333333 >>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total) >>> * 4 0.896083541418 -0.635625854075 -0.0423532645466 -0.526910255885 0.498518696049 -0.872983895832 1.3333333333333333 ``` i think this is because when we import pyspark.worker in the daemon.py, we alse import a random by the shuffle.py which is imported by pyspark.worker, this worker, forked by "pid = os.fork()", also remains the state of the parent's random, thus every forked worker get the same random.next(). we need to re-random the random by random.seed, which will solve the problem, but i think this PR. may not be the proper fix. ths. > each forked worker in daemon.py keep the parent's random state > --------------------------------------------------------------- > > Key: SPARK-15611 > URL: https://issues.apache.org/jira/browse/SPARK-15611 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.6.1 > Reporter: Thomas Lau > Priority: Minor > > hi, i'm writing some code as below: > ```py > from random import random > from operator import add > def funcx( x ): > print x[0],x[1] > return 1 if x[0]**2 + x[1]**2 < 1 else 0 > def genRnd(ind): > x=random() * 2 - 1 > y=random() * 2 - 1 > return (x,y) > def runsp(total): > ret=sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(lambda x, > y: x + y)/float(total) * 4 > print ret > runsp(3) > ``` > once started the pyspark shell, no matter how many times i run "runsp(N)" , > this code always get a same sequece of random numbers, like this > ```sh > 0.896083541418 -0.635625854075 > -0.0423532645466 -0.526910255885 > 0.498518696049 -0.872983895832 > 1.3333333333333333 > >>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total) > >>> * 4 > 0.896083541418 -0.635625854075 > -0.0423532645466 -0.526910255885 > 0.498518696049 -0.872983895832 > 1.3333333333333333 > >>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total) > >>> * 4 > 0.896083541418 -0.635625854075 > -0.0423532645466 -0.526910255885 > 0.498518696049 -0.872983895832 > 1.3333333333333333 > ``` > i think this is because when we import pyspark.worker in the daemon.py, we > alse import a random by the shuffle.py which is imported by pyspark.worker, > this worker, forked by "pid = os.fork()", also remains the state of the > parent's random, thus every forked worker get the same random.next(). > we need to re-random the random by random.seed, which will solve the problem, > but i think this PR. may not be the proper fix. > ths. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org