I implemented a simple KNN classifier. And i can run it successfully on a single sample, but it occurs an error when it is run on a test samples RDD. I attach the source code in attachment. Look forward for you replay! Best wishes to you!
The following is source code. import math from pyspark import SparkContext from pyspark.mllib.regression import LabeledPoint from pyspark.mllib._common import _dot class KNN(object): def __init__(self, data, k): ''' data: RDD of LabeledPoint ''' self._data = data self._k = k self._data.cache() def predict(self, x): topksamples = self._data.map(lambda point:(_dot(point.features, x)/math.sqrt(_dot(point.features,x)*_dot(point.features,x)), point.label)).sortByKey(False).top(self._k) labeldict = {} for score,label in topksamples: labeldict.setdefault(label, 0) labeldict[label] += 1 label = sorted([(label,count) for label,count in labeldict.items()], key=lambda x:x[1], reverse=True)[0][0] return label # Load and parse the data def parsePoint(line): values = [float(x) for x in line.split(' ')] return LabeledPoint(values[0], values[1:]) sc = SparkContext(appName="PythonLR") data = sc.textFile("file:///home/hadoop/spark/lr_data.txt") parsedData = data.map(parsePoint) # Build the model model = KNN(parsedData, 20) # Evaluating a single sample on training data print 'Predict lable is: %s ' % model.predict(parsedData.first().features) # Evaluating a samples set which is represented as a RDD testData = sc.textFile("file:///home/hadoop/spark/lr_data.txt") testData = testData.map(parsePoint) labelsAndPreds = testData.map(lambda p: model.predict(p.features)) print labelsAndPreds.collect() -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-RDD-in-RDD-transformation-tp10014.html Sent from the Apache Spark User List mailing list archive at Nabble.com.