Hi Buntu, You could something similar to the following:
val receiver_stream = new ReceiverInputDStream(ssc) { > override def getReceiver(): Receiver[Nothing] = ??? //Whatever > }.map((x : String) => (null, x)) > val config = new Configuration() > config.set("mongo.output.uri", "mongodb://akhld:27017/sigmoid.output") > receiver_stream.foreachRDD(rdd => { > val pair_rdd = new PairRDDFunctions[Null, String](rdd) // make sure > your rdd contains a key, value > *pair_rdd.saveAsNewAPIHadoopFile("/home/akhld/sigmoid/beta/", > classOf[Any], classOf[Any], > classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)* > }) Thanks Best Regards On Tue, Oct 21, 2014 at 11:59 PM, Buntu Dev <buntu...@gmail.com> wrote: > Thanks Akhil, > > I tried this but running into similar error: > > ~~~~~~ > val stream = KafkaUtils.createStream(ssc, zkQuorum, group, > topicpMap).map(_._2) > stream.map(message => { > (null, message) > > }).saveAsNewAPIHadoopFile (destination, classOf[Void], > classOf[Group], classOf[ExampleOutputFormat], conf) > ~~~~ > > Error: > value saveAsNewAPIHadoopFile is not a member of > org.apache.spark.rdd.RDD[(Null, String)] > > > How do I go about converting to PairRDDFunctions? > > > On Fri, Oct 10, 2014 at 12:01 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> You can convert this ReceiverInputDStream >> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream> >> into PairRDDFuctions >> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions> >> and call the saveAsNewAPIHadoopFile. >> >> Thanks >> Best Regards >> >> On Fri, Oct 10, 2014 at 11:28 AM, Buntu Dev <buntu...@gmail.com> wrote: >> >>> Basically I'm attempting to convert a JSON stream to Parquet and I get >>> this error without the .values or .map(_._2) : >>> >>> value saveAsNewAPIHadoopFile is not a member of >>> org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, String)] >>> >>> >>> >>> >>> On Thu, Oct 9, 2014 at 10:15 PM, Sean Owen <so...@cloudera.com> wrote: >>> >>>> Your RDD does not contain pairs, since you ".map(_._2)" (BTW that can >>>> just be ".values"). "Hadoop files" means "SequenceFiles" and those >>>> store key-value pairs. That's why the method only appears for >>>> RDD[(K,V)]. >>>> >>>> On Fri, Oct 10, 2014 at 3:50 AM, Buntu Dev <buntu...@gmail.com> wrote: >>>> > Thanks Sean, but I'm importing >>>> org.apache.spark.streaming.StreamingContext._ >>>> > >>>> > Here are the spark imports: >>>> > >>>> > import org.apache.spark.streaming._ >>>> > >>>> > import org.apache.spark.streaming.StreamingContext._ >>>> > >>>> > import org.apache.spark.streaming.kafka._ >>>> > >>>> > import org.apache.spark.SparkConf >>>> > >>>> > .... >>>> > >>>> > val stream = KafkaUtils.createStream(ssc, zkQuorum, group, >>>> > topicpMap).map(_._2) stream.saveAsNewAPIHadoopFile >>>> (destination, >>>> > classOf[Void], classOf[Group], classOf[ExampleOutputFormat], conf) >>>> > >>>> > .... >>>> > >>>> > Anything else I might be missing? >>>> >>> >>> >> >