Re: Streaming: getting data from Cassandra based on input stream values

2015-01-23 Thread madhu phatak
Hi,
In that case, you can try the following.

val joinRDD = kafkaStream.transform( streamRDD => {

val ids = streamRDD.map(_._2).collect();

ids.map(userId =>  ctable.select("user_name").where("userid = ?",
userId).toArray(0).get[String](0))

// better create a query which checks for all those ids at same time
})


On Sat, Jan 24, 2015 at 3:32 AM, Greg Temchenko  wrote:

>  Hi Madhu,
> Thanks for you response!
> But as I understand in this case you select all data from the Cassandra
> table. I don't wanna do it as it can be huge. I wanna just lookup some ids
> in the table. So it doesn't make sense for me how I can put some values
> from the streamRDD to the cassandra query (to "where" method).
>
> Greg
>
>
>
> On 1/23/15 1:11 AM, madhu phatak wrote:
>
> Hi,
> Seems like you want to get username for a give user id. You can use
> transform on the kafka stream to join two RDD's. The psuedo code looks like
> this
>
>  val joinRDD = kafkaStream.transform( streamRDD => {
>
>  streamRDD.map(value => (value._2,value._1)) join with
>  (ctable.select("userid,username"))
>
>  })
>
> On Fri, Jan 23, 2015 at 10:12 AM, Greg Temchenko 
> wrote:
>
>>  Hi there,
>>
>> I think I have a basic question, but I'm sort of stuck with figuring out
>> how to approach it, and I thought someone could point me to the right
>> direction.
>>
>> I'd like pull some data from Cassandra based on values received from an
>> input stream. Something like
>>
>> val ctable = ssc.cassandraTable("keyspace", "users")
>> val userNames = kafkaStream.flatMap {
>>   case (key,userid) => {
>> val userName = ctable.select("user_name").where("userid = ?",
>> userId).toArray(0).get[String](0)
>> Some(userId, userName)
>>   }
>> }
>>
>>
>> While the Cassandra query works in Spark shell, it throws an exception
>> when I used it inside flatMap:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent
>> failure: Lost task 0.0 in stage 46.0 (TID 35, localhost):
>> java.lang.NullPointerException:
>> org.apache.spark.rdd.RDD.(RDD.scala:125)
>>
>> com.datastax.spark.connector.rdd.CassandraRDD.(CassandraRDD.scala:49)
>>
>> com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)
>>
>> com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143)
>>
>> My understanding is that I cannot produce an RDD (Cassandra results)
>> inside another RDD. But how should I approach the problem instead?
>>
>>
>>
>> Thanks,
>>
>> --
>> Greg
>>
>>
>
>
>  --
>  Regards,
> Madhukara Phatak
> http://www.madhukaraphatak.com
>
>
>
> --
> Greg
>
>


-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Streaming: getting data from Cassandra based on input stream values

2015-01-22 Thread Greg Temchenko

Hi there,

I think I have a basic question, but I'm sort of stuck with figuring out 
how to approach it, and I thought someone could point me to the right 
direction.


I'd like pull some data from Cassandra based on values received from an 
input stream. Something like


   val ctable = ssc.cassandraTable("keyspace", "users")
   val userNames = kafkaStream.flatMap {
  case (key,userid) => {
val userName = ctable.select("user_name").where("userid = ?",
   userId).toArray(0).get[String](0)
Some(userId, userName)
  }
   }


While the Cassandra query works in Spark shell, it throws an exception 
when I used it inside flatMap:


Exception in thread "main" org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent 
failure: Lost task 0.0 in stage 46.0 (TID 35, localhost): 
java.lang.NullPointerException:

org.apache.spark.rdd.RDD.(RDD.scala:125)
com.datastax.spark.connector.rdd.CassandraRDD.(CassandraRDD.scala:49)
com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)
com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143)

My understanding is that I cannot produce an RDD (Cassandra results) 
inside another RDD. But how should I approach the problem instead?




Thanks,

--
Greg