Here's what we've tried so far as a first example of a custom Mongo receiver :
/class MongoStreamReceiver(host: String) extends NetworkReceiver[String] { protected lazy val blocksGenerator: BlockGenerator = new BlockGenerator(StorageLevel.MEMORY_AND_DISK_SER_2) protected def onStart() = { blocksGenerator.start() val driver = new MongoDriver val connection = driver.connection(List("m01-pdp2")) val db = connection.db("local") val collection = db.collection[BSONCollection]("oplog.rs") val query = BSONDocument("op" -> "i") val enumerator = collection. find(query). options(QueryOpts().tailable.awaitData). cursor[BSONDocument]. enumerate() val processor: Iteratee[BSONDocument, Unit] = Iteratee.foreach { doc => blocksGenerator += BSONDocument.pretty(doc) } enumerator |>>> processor } protected def onStop() { blocksGenerator.stop() } } / However this code doesn't run, probably because of serialization issues (no logs to confirm this though, just no data in the stream...) Note that if we comment out the ReactiveMongo-related code and put something like this instead, the code runs fine : / for (i <- 0 until 1000) { blocksGenerator += "hello world" Thread.sleep(1000) } / The Java socket example (found here <http://spark.apache.org/docs/0.9.1/streaming-custom-receivers.html> ) works fine as well. Any hints ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-ReactiveMongo-tp14568p14661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org