want to read file data and check if file line data is present in Cassandra
if it's present then needs to merge otherwise fresh insert to C*. File data
just contains name,address in json format, in Cassandra student table have
UUID as primary key and there is secondry index on name

Once data is merged to cassandra I want to send new UUID or existing UUID to
KAfka.

When I run on locally or single machine on mesos cluster(keeping line
sparkConf.setMaster("local[4]");) this program works but when I submit to
mesos master with 4 slaves(commenting line
//sparkConf.setMaster("local[4]"); on cluster) there is nullpointer while
selecting data from Cassandra on javastreaming context

I made streaming context static as earliar it was throwing serialization
exception as it was getting accessed inside map transformation for file
dstream.

Is there something wrong with the approach or ? is it because I am trying
build Cassandra RDD withing DStream map tranformation which causing issue

import kafka.producer.KeyedMessage;

import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import java.util.Properties;
import java.util.UUID;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.cloudera.spark.streaming.kafka.JavaDStreamKafkaWriter;
import org.cloudera.spark.streaming.kafka.JavaDStreamKafkaWriterFactory;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;


public class DStreamExample {


    public DStreamExample() {
    }

  private static JavaStreamingContext ssc;

    public static void main(final String[] args) {
        final SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("SparkJob");

        sparkConf.setMaster("local[4]"); // for local
        sparkConf.set("spark.cassandra.connection.host", cassandra_hosts);


        ssc = new JavaStreamingContext(sparkConf,new Duration(2000));


        final JavaDStream<Student> studentFileDStream = ssc.textFileStream(
                "/usr/local/fileDir/").map(line -> {
                final Gson gson = new Gson();
                final JsonParser parser = new JsonParser();

                final JsonObject jsonObject = parser.parse(line)
                    .getAsJsonObject();

                // generating new UUID     
                studentFile.setId(UUID.randomUUID());

                final Student studentFile = gson.fromJson(jsonObject,
                        Student.class);

            try{
                //NullPointer at this line while running on cluster
                final JavaRDD<Student> cassandraStudentRDD =
                    CassandraStreamingJavaUtil.javaFunctions(ssc)
                    .cassandraTable("keyspace", "student",
                        mapRowTo(Student.class)).where("name=?",
                        studentFile.getName());


                //If student name is found in cassandra table then assign
UUID to fileStudent object
                //This way i wont create multiple records for same name
student
                final Student studentCassandra =
cassandraStudentRDD.first();
                studentFile.setId(studentCassandra.getId());

                }catch(Exception e){

                }
                return studentFile;

            });

            //Save student to Cassandra
        CassandraStreamingJavaUtil.javaFunctions(studentFileDStream)
            .writerBuilder("keyspace", "student", mapToRow(Student.class))
            .saveToCassandra();


        final JavaDStreamKafkaWriter<Student> writer =
           
JavaDStreamKafkaWriterFactory.fromJavaDStream(studentFileDStream);


        final Properties properties = new Properties();
        properties.put("metadata.broker.list", "server:9092");
        properties.put("serializer.class",
"kafka.serializer.StringEncoder");

        //Just send studnet UUID_PUT to kafka    
        writer.writeToKafka(properties,
            student ->
                new KeyedMessage<>("TOPICNAME", student.getId() + "_PUT"));

        ssc.start();
        ssc.awaitTermination();


    }

}

class Student {
    private String address;
    private UUID id;
    private String name;

    public Student() {
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public UUID getId() {
        return id;
    }

    public void setId(UUID id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}


stracktrace::
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0
(TID 3, servername): java.lang.NullPointerException
        at
com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.javaFunctions(CassandraStreamingJavaUtil.java:39)
        at
com.ebates.ps.batch.sparkpoc.DStreamPOCExample.lambda$main$d2c4cc2c$1(DStreamPOCExample.java:109)
        at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1027)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at
org.cloudera.spark.streaming.kafka.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:47)
        at
org.cloudera.spark.streaming.kafka.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:45)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
        at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
        at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JavaStreamingContext-nullpointer-exception-while-fetching-data-from-Cassandra-tp25448.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to