Hello, 

I'm using spark on yarn cluster and using mongo-hadoop-connector to pull
data to spark, doing some job
The job has following stage.
(flatMap -> flatMap -> reduceByKey -> sortByKey)

The data in MongoDB is tweet from twitter.

First, connect to mongodb and make RDD by following

val mongoRDD = sc.newAPIHadoopRDD(mongoConfig,
classOf[com.mongodb.hadoop.MongoInputFormat],classOf[Object],
classOf[BSONObject])

Set "mongo.input.fields" as below
mongoConfig.set("mongo.input.fields", "{\"_id\": 1, \"text\" : 1}")

the data inside of mongoRDD is looks like

(558baf...,
{ "_id" : { "$oid" : "558baf…"} , "text" : “Apache spark is Lightning-fast
cluster …” })
(558baf...,
{ "_id" : { "$oid" : "558baf…"} , "text" : “hello, my  …” })
(558baf...,
{ "_id" : { "$oid" : "558baf…"} , "text" : “Hi, aaa …” })


Nex stage, I use flatMap, inside flatMap getting "text" element(tweet) and
dividing them to word. 

val wordRDD = mongoRDD.flatMap(arg => {
      
      var str = arg._2.get("text").toString

       // using tokenizer to divid tweet to word or just split tweet by
white space
}

After this, wordRDD is looks like

("Apache", "spark", "is", "Lightning-fast", "cluster", "hello", "my",
......)


When I trying to print every element in wordRDD, I get following error. I
know that the tweet involve with newline charactor or space or tab, but what
makes this NPE? 

Is this error "Iterator$$anon$13.hasNext" means iterating though RDD and the
next value is null ?


15/08/13 22:15:14 INFO scheduler.TaskSetManager: Starting task 2766.3 in
stage 14.0 (TID 11136, iot-spark02, RACK_LOCAL, 1951 bytes)                     
                                                                 
15/08/13 22:18:53 WARN scheduler.TaskSetManager: Lost task 2766.3 in stage
14.0 (TID 11136, iot-spark02): java.lang.NullPointerException                   
                                                               
        at
$line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:85)
  
        at
$line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:73)
  
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)       
                             
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)       
                             
        at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
               
        at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
                 
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)      
                  
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)      
                  
        at org.apache.spark.scheduler.Task.run(Task.scala:70)                   
                             
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)           
                  
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
                  
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
                  
        at java.lang.Thread.run(Thread.java:745) 


Can I avoid this error by wrapping the word by scala.Option ?
If anybody know why, please help me? 

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-endup-with-NPE-tp24264.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to