Hi,
In that case, you can try the following.

val joinRDD = kafkaStream.transform( streamRDD => {

val ids = streamRDD.map(_._2).collect();

ids.map(userId =>  ctable.select("user_name").where("userid = ?",
userId).toArray(0).get[String](0))

// better create a query which checks for all those ids at same time
})


On Sat, Jan 24, 2015 at 3:32 AM, Greg Temchenko <s...@dicefield.com> wrote:

>  Hi Madhu,
> Thanks for you response!
> But as I understand in this case you select all data from the Cassandra
> table. I don't wanna do it as it can be huge. I wanna just lookup some ids
> in the table. So it doesn't make sense for me how I can put some values
> from the streamRDD to the cassandra query (to "where" method).
>
> Greg
>
>
>
> On 1/23/15 1:11 AM, madhu phatak wrote:
>
> Hi,
> Seems like you want to get username for a give user id. You can use
> transform on the kafka stream to join two RDD's. The psuedo code looks like
> this
>
>  val joinRDD = kafkaStream.transform( streamRDD => {
>
>  streamRDD.map(value => (value._2,value._1)) join with
>  (ctable.select("userid,username"))
>
>  })
>
> On Fri, Jan 23, 2015 at 10:12 AM, Greg Temchenko <s...@dicefield.com>
> wrote:
>
>>  Hi there,
>>
>> I think I have a basic question, but I'm sort of stuck with figuring out
>> how to approach it, and I thought someone could point me to the right
>> direction.
>>
>> I'd like pull some data from Cassandra based on values received from an
>> input stream. Something like
>>
>> val ctable = ssc.cassandraTable("keyspace", "users")
>> val userNames = kafkaStream.flatMap {
>>   case (key,userid) => {
>>     val userName = ctable.select("user_name").where("userid = ?",
>> userId).toArray(0).get[String](0)
>>     Some(userId, userName)
>>   }
>> }
>>
>>
>> While the Cassandra query works in Spark shell, it throws an exception
>> when I used it inside flatMap:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent
>> failure: Lost task 0.0 in stage 46.0 (TID 35, localhost):
>> java.lang.NullPointerException:
>>         org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
>>
>> com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49)
>>
>> com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)
>>
>> com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143)
>>
>> My understanding is that I cannot produce an RDD (Cassandra results)
>> inside another RDD. But how should I approach the problem instead?
>>
>>
>>
>> Thanks,
>>
>> --
>> Greg
>>
>>
>
>
>  --
>  Regards,
> Madhukara Phatak
> http://www.madhukaraphatak.com
>
>
>
> --
> Greg
>
>


-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com

Reply via email to