want to read file data and check if file line data is present in Cassandra if it's present then needs to merge otherwise fresh insert to C*. File data just contains name,address in json format, in Cassandra student table have UUID as primary key and there is secondry index on name
Once data is merged to cassandra I want to send new UUID or existing UUID to KAfka. When I run on locally or single machine on mesos cluster(keeping line sparkConf.setMaster("local[4]");) this program works but when I submit to mesos master with 4 slaves(commenting line //sparkConf.setMaster("local[4]"); on cluster) there is nullpointer while selecting data from Cassandra on javastreaming context I made streaming context static as earliar it was throwing serialization exception as it was getting accessed inside map transformation for file dstream. Is there something wrong with the approach or ? is it because I am trying build Cassandra RDD withing DStream map tranformation which causing issue import kafka.producer.KeyedMessage; import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import java.util.Properties; import java.util.UUID; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.cloudera.spark.streaming.kafka.JavaDStreamKafkaWriter; import org.cloudera.spark.streaming.kafka.JavaDStreamKafkaWriterFactory; import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo; import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; public class DStreamExample { public DStreamExample() { } private static JavaStreamingContext ssc; public static void main(final String[] args) { final SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("SparkJob"); sparkConf.setMaster("local[4]"); // for local sparkConf.set("spark.cassandra.connection.host", cassandra_hosts); ssc = new JavaStreamingContext(sparkConf,new Duration(2000)); final JavaDStream<Student> studentFileDStream = ssc.textFileStream( "/usr/local/fileDir/").map(line -> { final Gson gson = new Gson(); final JsonParser parser = new JsonParser(); final JsonObject jsonObject = parser.parse(line) .getAsJsonObject(); // generating new UUID studentFile.setId(UUID.randomUUID()); final Student studentFile = gson.fromJson(jsonObject, Student.class); try{ //NullPointer at this line while running on cluster final JavaRDD<Student> cassandraStudentRDD = CassandraStreamingJavaUtil.javaFunctions(ssc) .cassandraTable("keyspace", "student", mapRowTo(Student.class)).where("name=?", studentFile.getName()); //If student name is found in cassandra table then assign UUID to fileStudent object //This way i wont create multiple records for same name student final Student studentCassandra = cassandraStudentRDD.first(); studentFile.setId(studentCassandra.getId()); }catch(Exception e){ } return studentFile; }); //Save student to Cassandra CassandraStreamingJavaUtil.javaFunctions(studentFileDStream) .writerBuilder("keyspace", "student", mapToRow(Student.class)) .saveToCassandra(); final JavaDStreamKafkaWriter<Student> writer = JavaDStreamKafkaWriterFactory.fromJavaDStream(studentFileDStream); final Properties properties = new Properties(); properties.put("metadata.broker.list", "server:9092"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); //Just send studnet UUID_PUT to kafka writer.writeToKafka(properties, student -> new KeyedMessage<>("TOPICNAME", student.getId() + "_PUT")); ssc.start(); ssc.awaitTermination(); } } class Student { private String address; private UUID id; private String name; public Student() { } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public UUID getId() { return id; } public void setId(UUID id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } } stracktrace:: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, servername): java.lang.NullPointerException at com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.javaFunctions(CassandraStreamingJavaUtil.java:39) at com.ebates.ps.batch.sparkpoc.DStreamPOCExample.lambda$main$d2c4cc2c$1(DStreamPOCExample.java:109) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.cloudera.spark.streaming.kafka.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:47) at org.cloudera.spark.streaming.kafka.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:45) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaStreamingContext-nullpointer-exception-while-fetching-data-from-Cassandra-tp25448.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org