Hello all, I have an error in pyspark for which I have not the faintest idea of the cause. All I can tell from the stack trace is that it can't find a pyspark file on the path /mnt/spark-*/pyspark-*. Apart from that I need someone more experienced than me with Spark to look into it and help diagnose the problem and suggest potential solutions, hence I am looking to this group for help.
If anyone wants to read the same question on Stack Overflow here is the link: http://stackoverflow.com/questions/30328104/pyspark-job-throwing-ioerror Here's the same thing pasted as raw text: I am trying to write a simple KNN job using pyspark on a hdfs cluster. I am using very few input files to perform the job so I don't think it's a memory (space). I do not do a broadcast in any part of my code. So it is surprising to me when the broadcast.py fails? I however do have python dictionaries that I have in shared memory without explicitly doing a broadcast. Can anyone help me understand what is going on? I have appended my python file and the stack trace to this email. Thanks, Nikhil
from pyspark.mllib.linalg import SparseVector from pyspark import SparkContext import glob import sys import time import subprocess from itertools import combinations """We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts.""" def create_indices(inputdir): items=dict() user_id_to_idx=dict() user_idx_to_id=dict() item_idx_to_id=dict() item_id_to_idx=dict() item_idx=0 user_idx=0 cat=subprocess.Popen(["hadoop","fs","-cat","/user/hadoop/"+inputdir+"/*.txt"],stdout=subprocess.PIPE) for line in cat.stdout: toks=map(str,line.strip().split("\t")) try: user_id_to_idx[toks[1].strip()] except KeyError: if toks[1].strip()!=None: user_id_to_idx[toks[1].strip()]=user_idx user_idx_to_id[user_idx]=toks[1].strip() user_idx+=1 try: item_id_to_idx[toks[0].strip()] except KeyError: if toks[0].strip()!=None: item_id_to_idx[toks[0].strip()]=item_idx item_idx_to_id[item_idx]=toks[0].strip() item_idx+=1 return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx def concat_helper(a,b): if(a!= None and b!=None): print a,b,a.update(b) temp=dict() temp.update(a) temp.update(b) return temp elif a!=None: return a elif b!=None: return b # pass in the hdfs path to the input files and the spark context. def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx): rdd_text=sc.textFile(inputdir) try: new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split("\t")[0])],{user_id_to_idx[str(x.strip().split("\t")[1])]:1})).reduceByKey(lambda x,y: concat_helper(x,y)).sortByKey() except KeyError: print item_id_to_idx.keys() pass return new_rdd if __name__=="__main__": sc = SparkContext() u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1]) u_idx_to_id_b=sc.broadcast(u_idx_to_id) u_id_to_idx_b=sc.broadcast(u_id_to_idx) i_idx_to_idx_b=sc.broadcast(i_idx_to_id) i_id_to_idx_b=sc.broadcast(i_id_to_idx) num_users=sc.broadcast(u_idx) num_items=sc.broadcast(i_idx) item_dict_rdd=runKNN(sys.argv[1],sc,u_id_to_idx,i_id_to_idx) item_dict_rdd_new=item_dict_rdd.map(lambda x: (x[0],SparseVector(i_idx,x[1]))) item_dict_rdd_new.saveAsTextFile("hdfs://output_path") #dot_products_rdd=map(lambda (x,y): (x,y),combinations(item_dict_rdd_new.map(lambda x: x),2)) dot_products_rdd.saveAsTextFile("hdfs://output_path_2")
stacktrace
Description: stacktrace
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org