Thanks Sean, Sorting definitely solves it, but I was hoping it could be avoided :)
In the documentation for Classification in ML-Lib for example, zip() is used to create labelsAndPredictions: ----- from pyspark.mllib.tree import RandomForest from pyspark.mllib.util import MLUtils # Load and parse the data file into an RDD of LabeledPoint. data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) # Train a RandomForest model. # Empty categoricalFeaturesInfo indicates all features are continuous. # Note: Use larger numTrees in practice. # Setting featureSubsetStrategy="auto" lets the algorithm choose. model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, numTrees=3, featureSubsetStrategy="auto”, impurity='gini', maxDepth=4, maxBins=32) # Evaluate model on test instances and compute test error predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count()) print('Test Error = ' + str(testErr)) print('Learned classification forest model:') print(model.toDebugString()) ---- The reason the zip() works here is because the RDD is loaded from a file. If it was generated with something that includes a JOIN() it won’t work due to this same issue. Maybe worth mentioning in the docs then? Ofer > On Mar 23, 2015, at 11:40 AM, Sean Owen <so...@cloudera.com> wrote: > > I think the explanation is that the join does not guarantee any order, > since it causes a shuffle in general, and it is computed twice in the > first example, resulting in a difference for d1 and d2. > > You can persist() the result of the join and in practice I believe > you'd find it behaves as expected, although that is even not 100% > guaranteed since a block could be lost and recomputed (differently). > > If order matters, and it does for zip(), then the reliable way to > guarantee a well defined ordering for zipping is to sort the RDDs. > > On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch > <omendelevi...@hortonworks.com> wrote: >> Hi, >> >> I am running into a strange issue when doing a JOIN of two RDDs followed by >> ZIP from PySpark. >> It’s part of a more complex application, but was able to narrow it down to a >> simplified example that’s easy to replicate and causes the same problem to >> appear: >> >> >> raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)]) >> data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair: >> ','.join([x for x in pair[1]])) >> d1 = data.map(lambda s: s.split(',')[0]) >> d2 = data.map(lambda s: s.split(',')[1]) >> x = d1.zip(d2) >> >> print x.take(10) >> >> >> The output is: >> >> >> [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81', >> 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'), >> ('v83', 'v83')] >> >> >> As you can see, the ordering of items is not preserved anymore in all cases. >> (e.g., ‘v81’ is preserved, and ‘v45’ is not) >> Is it not supposed to be preserved? >> >> If I do the same thing without the JOIN: >> >> data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100)) >> d1 = data.map(lambda s: s.split(',')[0]) >> d2 = data.map(lambda s: s.split(',')[1]) >> x = d1.zip(d2) >> >> print x.take(10) >> >> The output is: >> >> >> [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'), >> ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')] >> >> >> As expected. >> >> Anyone run into this or a similar issue? >> >> Ofer