Hi there,
I think I have a basic question, but I'm sort of stuck with figuring out
how to approach it, and I thought someone could point me to the right
direction.
I'd like pull some data from Cassandra based on values received from an
input stream. Something like
val ctable = ssc.cassandraTable("keyspace", "users")
val userNames = kafkaStream.flatMap {
case (key,userid) => {
val userName = ctable.select("user_name").where("userid = ?",
userId).toArray(0).get[String](0)
Some(userId, userName)
}
}
While the Cassandra query works in Spark shell, it throws an exception
when I used it inside flatMap:
Exception in thread "main" org.apache.spark.SparkException: Job aborted
due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent
failure: Lost task 0.0 in stage 46.0 (TID 35, localhost):
java.lang.NullPointerException:
org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49)
com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)
com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143)
My understanding is that I cannot produce an RDD (Cassandra results)
inside another RDD. But how should I approach the problem instead?
Thanks,
--
Greg