Hi there,

I think I have a basic question, but I'm sort of stuck with figuring out how to approach it, and I thought someone could point me to the right direction.

I'd like pull some data from Cassandra based on values received from an input stream. Something like

   val ctable = ssc.cassandraTable("keyspace", "users")
   val userNames = kafkaStream.flatMap {
      case (key,userid) => {
        val userName = ctable.select("user_name").where("userid = ?",
   userId).toArray(0).get[String](0)
        Some(userId, userName)
      }
   }


While the Cassandra query works in Spark shell, it throws an exception when I used it inside flatMap:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 35, localhost): java.lang.NullPointerException:
        org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49)
com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)
com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143)

My understanding is that I cannot produce an RDD (Cassandra results) inside another RDD. But how should I approach the problem instead?



Thanks,

--
Greg

Reply via email to