you can also wrap mahout context around existing spark session (aka context).
On Sat, May 7, 2016 at 9:41 PM, Rohit Jain <rohitkjai...@gmail.com> wrote: > Yes, we did figure out this problem. And realised that instead sparkcontext > I have to use mahoutsparkcontext, > > On Sun, May 8, 2016 at 4:26 AM, Pat Ferrel <p...@occamsmachete.com> wrote: > > > I think you have to create a SparkDistributedContext, which has Mahout > > specific Kryo serialization and adds Mahout jars. If you let Mahout > create > > the Spark context it’s simpler > > > > val implicit mc = mahoutSparkContext(masterUr = “local", appName = > > “SparkExample”) > > > > As I recall the sc will then be implicit through a conversion in the > > Mahout Spark package but if I’m wrong and you still get errors create the > > sc with > > > > val sc = sdc2sc(mc) > > > > This is from memory, your debugger may provide better help. I suspect the > > error below is from not having kryo serialization classes configured > > properly. > > > > On May 7, 2016, at 3:05 AM, Rohit Jain <rohitkjai...@gmail.com> wrote: > > > > hello everyone, > > > > I want to run Spark RowSimilarity recommender on data obtained from > > mongodb. For this purpose, I've written below code which takes input from > > mongo, converts it to RDD of Objects. This needs to be passed to > > IndexedDataSetSpark which is then passed to SimilarityAnalysis. > > rowSimilarityIDS. > > > > import org.apache.hadoop.conf.Configuration > > import org.apache.mahout.math.cf.SimilarityAnalysis > > import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark > > import org.apache.spark.rdd.{NewHadoopRDD, RDD} > > import org.apache.spark.{SparkConf, SparkContext} > > import org.bson.BSONObject > > import com.mongodb.hadoop.MongoInputFormat > > > > object SparkExample extends App { > > val mongoConfig = new Configuration() > > mongoConfig.set("mongo.input.uri", > > "mongodb://my_mongo_ip:27017/db.collection") > > > > val sparkConf = new SparkConf() > > val sc = new SparkContext("local", "SparkExample", sparkConf) > > > > val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD( > > mongoConfig, > > classOf[MongoInputFormat], > > classOf[Object], > > classOf[BSONObject] > > ) > > val new_doc: RDD[(String, String)] = documents.map( > > doc1 => ( > > doc1._2.get("product_id").toString(), > > doc1._2.get("product_attribute_value").toString().replace("[ \"", > > "").replace("\"]", "").split("\" , \"").map(value => > > value.toLowerCase.replace(" ", "-")).mkString(" ") > > ) > > ) > > var myIDs = IndexedDatasetSpark(new_doc)(sc) > > > > > > > > > SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile", > > readWriteSchema) > > > > after runnning the code I am getiing this error: > > java.io.NotSerializableException: org.apache.mahout.math.DenseVector > > Serialization stack: > > - object not serializable (class: org.apache.mahout.math.DenseVector, > > value: {3:1.0,8:1.0,10:1.0}) > > - field (class: scala.Some, name: x, type: class java.lang.Object) > > - object (class scala.Some, Some({3:1.0,8:1.0,10:1.0})) > > at > > > > > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > > at > > > > > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > > at > > > > > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240) > > at > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > > > Please help me with this. > > > > > > -- > > Thanks & Regards, > > > > *Rohit Jain* > > Web developer | Consultant > > Mob +91 8097283931 > > > > > > > -- > Thanks & Regards, > > *Rohit Jain* > Web developer | Consultant > Mob +91 8097283931 >