Hi, In that case, you can try the following. val joinRDD = kafkaStream.transform( streamRDD => {
val ids = streamRDD.map(_._2).collect(); ids.map(userId => ctable.select("user_name").where("userid = ?", userId).toArray(0).get[String](0)) // better create a query which checks for all those ids at same time }) On Sat, Jan 24, 2015 at 3:32 AM, Greg Temchenko <s...@dicefield.com> wrote: > Hi Madhu, > Thanks for you response! > But as I understand in this case you select all data from the Cassandra > table. I don't wanna do it as it can be huge. I wanna just lookup some ids > in the table. So it doesn't make sense for me how I can put some values > from the streamRDD to the cassandra query (to "where" method). > > Greg > > > > On 1/23/15 1:11 AM, madhu phatak wrote: > > Hi, > Seems like you want to get username for a give user id. You can use > transform on the kafka stream to join two RDD's. The psuedo code looks like > this > > val joinRDD = kafkaStream.transform( streamRDD => { > > streamRDD.map(value => (value._2,value._1)) join with > (ctable.select("userid,username")) > > }) > > On Fri, Jan 23, 2015 at 10:12 AM, Greg Temchenko <s...@dicefield.com> > wrote: > >> Hi there, >> >> I think I have a basic question, but I'm sort of stuck with figuring out >> how to approach it, and I thought someone could point me to the right >> direction. >> >> I'd like pull some data from Cassandra based on values received from an >> input stream. Something like >> >> val ctable = ssc.cassandraTable("keyspace", "users") >> val userNames = kafkaStream.flatMap { >> case (key,userid) => { >> val userName = ctable.select("user_name").where("userid = ?", >> userId).toArray(0).get[String](0) >> Some(userId, userName) >> } >> } >> >> >> While the Cassandra query works in Spark shell, it throws an exception >> when I used it inside flatMap: >> >> Exception in thread "main" org.apache.spark.SparkException: Job aborted >> due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent >> failure: Lost task 0.0 in stage 46.0 (TID 35, localhost): >> java.lang.NullPointerException: >> org.apache.spark.rdd.RDD.<init>(RDD.scala:125) >> >> com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49) >> >> com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83) >> >> com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143) >> >> My understanding is that I cannot produce an RDD (Cassandra results) >> inside another RDD. But how should I approach the problem instead? >> >> >> >> Thanks, >> >> -- >> Greg >> >> > > > -- > Regards, > Madhukara Phatak > http://www.madhukaraphatak.com > > > > -- > Greg > > -- Regards, Madhukara Phatak http://www.madhukaraphatak.com