Hi Buntu,

You could something similar to the following:

 val receiver_stream = new ReceiverInputDStream(ssc) {
>       override def getReceiver(): Receiver[Nothing] = ??? //Whatever
>     }.map((x : String) => (null, x))
>     val config = new Configuration()
>     config.set("mongo.output.uri", "mongodb://akhld:27017/sigmoid.output")
>     receiver_stream.foreachRDD(rdd => {
>       val pair_rdd = new PairRDDFunctions[Null, String](rdd) // make sure
> your rdd contains a key, value
>       *pair_rdd.saveAsNewAPIHadoopFile("/home/akhld/sigmoid/beta/",
> classOf[Any], classOf[Any],
> classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)*
>     })


Thanks
Best Regards

On Tue, Oct 21, 2014 at 11:59 PM, Buntu Dev <buntu...@gmail.com> wrote:

> Thanks Akhil,
>
> I tried this but running into similar error:
>
> ~~~~~~
>  val stream = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicpMap).map(_._2)
>  stream.map(message => {
>      (null, message)
>
>                    }).saveAsNewAPIHadoopFile (destination, classOf[Void],
> classOf[Group], classOf[ExampleOutputFormat], conf)
> ~~~~
>
> Error:
> value saveAsNewAPIHadoopFile is not a member of
> org.apache.spark.rdd.RDD[(Null, String)]
>
>
> How do I go about converting to PairRDDFunctions?
>
>
> On Fri, Oct 10, 2014 at 12:01 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> You can convert this ReceiverInputDStream
>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream>
>> into PairRDDFuctions
>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions>
>> and call the saveAsNewAPIHadoopFile.
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Oct 10, 2014 at 11:28 AM, Buntu Dev <buntu...@gmail.com> wrote:
>>
>>> Basically I'm attempting to convert a JSON stream to Parquet and I get
>>> this error without the .values or .map(_._2) :
>>>
>>>  value saveAsNewAPIHadoopFile is not a member of
>>> org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, String)]
>>>
>>>
>>>
>>>
>>> On Thu, Oct 9, 2014 at 10:15 PM, Sean Owen <so...@cloudera.com> wrote:
>>>
>>>> Your RDD does not contain pairs, since you ".map(_._2)" (BTW that can
>>>> just be ".values"). "Hadoop files" means "SequenceFiles" and those
>>>> store key-value pairs. That's why the method only appears for
>>>> RDD[(K,V)].
>>>>
>>>> On Fri, Oct 10, 2014 at 3:50 AM, Buntu Dev <buntu...@gmail.com> wrote:
>>>> > Thanks Sean, but I'm importing
>>>> org.apache.spark.streaming.StreamingContext._
>>>> >
>>>> > Here are the spark imports:
>>>> >
>>>> > import org.apache.spark.streaming._
>>>> >
>>>> > import org.apache.spark.streaming.StreamingContext._
>>>> >
>>>> > import org.apache.spark.streaming.kafka._
>>>> >
>>>> > import org.apache.spark.SparkConf
>>>> >
>>>> > ....
>>>> >
>>>> >     val stream = KafkaUtils.createStream(ssc, zkQuorum, group,
>>>> > topicpMap).map(_._2)             stream.saveAsNewAPIHadoopFile
>>>> (destination,
>>>> > classOf[Void], classOf[Group], classOf[ExampleOutputFormat], conf)
>>>> >
>>>> > ....
>>>> >
>>>> > Anything else I might be missing?
>>>>
>>>
>>>
>>
>

Reply via email to