Hello, I'm using spark on yarn cluster and using mongo-hadoop-connector to pull data to spark, doing some job The job has following stage. (flatMap -> flatMap -> reduceByKey -> sortByKey)
The data in MongoDB is tweet from twitter. First, connect to mongodb and make RDD by following val mongoRDD = sc.newAPIHadoopRDD(mongoConfig, classOf[com.mongodb.hadoop.MongoInputFormat],classOf[Object], classOf[BSONObject]) Set "mongo.input.fields" as below mongoConfig.set("mongo.input.fields", "{\"_id\": 1, \"text\" : 1}") the data inside of mongoRDD is looks like (558baf..., { "_id" : { "$oid" : "558baf…"} , "text" : “Apache spark is Lightning-fast cluster …” }) (558baf..., { "_id" : { "$oid" : "558baf…"} , "text" : “hello, my …” }) (558baf..., { "_id" : { "$oid" : "558baf…"} , "text" : “Hi, aaa …” }) Nex stage, I use flatMap, inside flatMap getting "text" element(tweet) and dividing them to word. val wordRDD = mongoRDD.flatMap(arg => { var str = arg._2.get("text").toString // using tokenizer to divid tweet to word or just split tweet by white space } After this, wordRDD is looks like ("Apache", "spark", "is", "Lightning-fast", "cluster", "hello", "my", ......) When I trying to print every element in wordRDD, I get following error. I know that the tweet involve with newline charactor or space or tab, but what makes this NPE? Is this error "Iterator$$anon$13.hasNext" means iterating though RDD and the next value is null ? 15/08/13 22:15:14 INFO scheduler.TaskSetManager: Starting task 2766.3 in stage 14.0 (TID 11136, iot-spark02, RACK_LOCAL, 1951 bytes) 15/08/13 22:18:53 WARN scheduler.TaskSetManager: Lost task 2766.3 in stage 14.0 (TID 11136, iot-spark02): java.lang.NullPointerException at $line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:85) at $line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:73) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Can I avoid this error by wrapping the word by scala.Option ? If anybody know why, please help me? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-endup-with-NPE-tp24264.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org