[ https://issues.apache.org/jira/browse/SPARK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-6886: ----------------------------------- Assignee: Davies Liu (was: Apache Spark) > Big closure in PySpark will fail during shuffle > ----------------------------------------------- > > Key: SPARK-6886 > URL: https://issues.apache.org/jira/browse/SPARK-6886 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.2.1, 1.3.0, 1.4.0 > Reporter: Davies Liu > Assignee: Davies Liu > Priority: Blocker > > Reported by beifei.zhou <beifei.zhou at ximalaya.com>: > I am using spark to process bid datasets. However, there is always problem > when executing reduceByKey on a large dataset, whereas with a smaller > dataset. May I asked you how could I solve this issue? > The error is always like this: > {code} > 15/04/09 11:27:46 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 5) > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File "/Users/nali/Softwares/spark/python/pyspark/worker.py", line 90, in > main > command = pickleSer.loads(command.value) > File "/Users/nali/Softwares/spark/python/pyspark/broadcast.py", line 106, > in value > self._value = self.load(self._path) > File "/Users/nali/Softwares/spark/python/pyspark/broadcast.py", line 87, in > load > with open(path, 'rb', 1 << 20) as f: > IOError: [Errno 2] No such file or directory: > '/private/var/folders/_x/n59vb1b54pl96lvldz2lr_v40000gn/T/spark-37d8ecbc-9ac9-4aa2-be23-12823f4cd1ed/pyspark-1e3d5904-a5b6-4222-a146-91bfdb4a33a7/tmp8XMhgG' > {code} > Here I attach my code: > {code} > import codecs > from pyspark import SparkContext, SparkConf > from operator import add > import operator > from pyspark.storagelevel import StorageLevel > def combine_dict(a,b): > a.update(b) > return a > conf = SparkConf() > sc = SparkContext(appName = "tag") > al_tag_dict = sc.textFile('albumtag.txt').map(lambda x: > x.split(',')).map(lambda x: {x[0]: x[1:]}).reduce(lambda a, b: > combine_dict(a,b)) > result = sc.textFile('uidAlbumscore.txt')\ > .map(lambda x: x.split(','))\ > .filter(lambda x: x[1] in al_tag_dict.keys())\ > .map(lambda x: (x[0], al_tag_dict[x[1]], float(x[2])))\ > .map(lambda x: map(lambda a: ((x[0], a), x[2]), x[1]))\ > .flatMap(lambda x: x)\ > .map(lambda x: (str(x[0][0]), x[1]))\ > .reduceByKey(add)\ > # .map(lambda x: x[0][0]+','+x[0][1]+','+str(x[1])+'\n')\ > # .reduce(add) > #codecs.open('tag_score.txt','w','utf-8').write(result) > print result.first() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org