Hello all,
  I have an error in pyspark for which I have not the faintest idea of the 
cause. All I can tell from the stack trace is that it can't find a pyspark file 
on the path /mnt/spark-*/pyspark-*. Apart from that I need someone more  
experienced than me with Spark to look into it and help diagnose the problem 
and suggest potential solutions, hence I am looking to this group for help.


If anyone wants to read the same question on Stack Overflow here is the link:  
http://stackoverflow.com/questions/30328104/pyspark-job-throwing-ioerror

Here's the same thing pasted as raw text:


I am trying to write a simple KNN job using pyspark on a hdfs cluster. I am 
using very few input files to perform the job so I don't think it's a memory 
(space). I do not do a broadcast in any part of my code. So it is surprising to 
me when the broadcast.py fails? I however do have python dictionaries that I 
have in shared memory without explicitly doing a broadcast.

Can anyone help me understand what is going on?

I have appended my python file and the stack trace to this email.


Thanks,

Nikhil

from pyspark.mllib.linalg import SparseVector
from pyspark import SparkContext
import glob
import sys
import time
import subprocess
from itertools import combinations
"""We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts."""
def create_indices(inputdir):
    items=dict()
    user_id_to_idx=dict()
    user_idx_to_id=dict()
    item_idx_to_id=dict()
    item_id_to_idx=dict()
    item_idx=0
    user_idx=0

    cat=subprocess.Popen(["hadoop","fs","-cat","/user/hadoop/"+inputdir+"/*.txt"],stdout=subprocess.PIPE)
    for line in cat.stdout:
        toks=map(str,line.strip().split("\t"))
        try:
            user_id_to_idx[toks[1].strip()]
        except KeyError:
            if toks[1].strip()!=None:
                user_id_to_idx[toks[1].strip()]=user_idx
                user_idx_to_id[user_idx]=toks[1].strip()
                user_idx+=1
        try:
            item_id_to_idx[toks[0].strip()]
        except KeyError:
            if toks[0].strip()!=None:
                item_id_to_idx[toks[0].strip()]=item_idx
                item_idx_to_id[item_idx]=toks[0].strip()
                item_idx+=1
    return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx

def concat_helper(a,b):
    if(a!= None and b!=None):
        print a,b,a.update(b)
        temp=dict()
        temp.update(a)
        temp.update(b)
        return temp
    elif a!=None:
        return a
    elif b!=None:
        return b

# pass in the hdfs path to the input files and the spark context.
def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx):
    rdd_text=sc.textFile(inputdir)
    try:
        new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split("\t")[0])],{user_id_to_idx[str(x.strip().split("\t")[1])]:1})).reduceByKey(lambda x,y: concat_helper(x,y)).sortByKey()
    except KeyError:
        print item_id_to_idx.keys()
        pass
    return new_rdd

if __name__=="__main__":
    sc = SparkContext()
    u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1])

    u_idx_to_id_b=sc.broadcast(u_idx_to_id)
    u_id_to_idx_b=sc.broadcast(u_id_to_idx)
    i_idx_to_idx_b=sc.broadcast(i_idx_to_id)
    i_id_to_idx_b=sc.broadcast(i_id_to_idx)
    num_users=sc.broadcast(u_idx)
    num_items=sc.broadcast(i_idx)
    item_dict_rdd=runKNN(sys.argv[1],sc,u_id_to_idx,i_id_to_idx)

    item_dict_rdd_new=item_dict_rdd.map(lambda x: (x[0],SparseVector(i_idx,x[1])))
    item_dict_rdd_new.saveAsTextFile("hdfs://output_path")
    #dot_products_rdd=map(lambda (x,y): (x,y),combinations(item_dict_rdd_new.map(lambda x: x),2))
    dot_products_rdd.saveAsTextFile("hdfs://output_path_2")

Attachment: stacktrace
Description: stacktrace

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to