You can put a try..catch around all the transformations that you are doing and catch such exceptions instead of crashing your entire job.
Thanks Best Regards On Fri, Aug 14, 2015 at 4:35 PM, hide <x22t33...@gmail.com> wrote: > Hello, > > I'm using spark on yarn cluster and using mongo-hadoop-connector to pull > data to spark, doing some job > The job has following stage. > (flatMap -> flatMap -> reduceByKey -> sortByKey) > > The data in MongoDB is tweet from twitter. > > First, connect to mongodb and make RDD by following > > val mongoRDD = sc.newAPIHadoopRDD(mongoConfig, > classOf[com.mongodb.hadoop.MongoInputFormat],classOf[Object], > classOf[BSONObject]) > > Set "mongo.input.fields" as below > mongoConfig.set("mongo.input.fields", "{\"_id\": 1, \"text\" : 1}") > > the data inside of mongoRDD is looks like > > (558baf..., > { "_id" : { "$oid" : "558baf…"} , "text" : “Apache spark is Lightning-fast > cluster …” }) > (558baf..., > { "_id" : { "$oid" : "558baf…"} , "text" : “hello, my …” }) > (558baf..., > { "_id" : { "$oid" : "558baf…"} , "text" : “Hi, aaa …” }) > > > Nex stage, I use flatMap, inside flatMap getting "text" element(tweet) and > dividing them to word. > > val wordRDD = mongoRDD.flatMap(arg => { > > var str = arg._2.get("text").toString > > // using tokenizer to divid tweet to word or just split tweet by > white space > } > > After this, wordRDD is looks like > > ("Apache", "spark", "is", "Lightning-fast", "cluster", "hello", "my", > ......) > > > When I trying to print every element in wordRDD, I get following error. I > know that the tweet involve with newline charactor or space or tab, but > what > makes this NPE? > > Is this error "Iterator$$anon$13.hasNext" means iterating though RDD and > the > next value is null ? > > > 15/08/13 22:15:14 INFO scheduler.TaskSetManager: Starting task 2766.3 in > stage 14.0 (TID 11136, iot-spark02, RACK_LOCAL, 1951 bytes) > 15/08/13 22:18:53 WARN scheduler.TaskSetManager: Lost task 2766.3 in stage > 14.0 (TID 11136, iot-spark02): java.lang.NullPointerException > at > > $line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:85) > at > > $line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:73) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) > at > > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > Can I avoid this error by wrapping the word by scala.Option ? > If anybody know why, please help me? > > Thanks, > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-endup-with-NPE-tp24264.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >