Re: NullPointerException from '.count.foreachRDD'

2014-08-20 Thread anoldbrain
Looking at the source codes of DStream.scala


   /**
* Return a new DStream in which each RDD has a single element generated
 by counting each RDD
* of this DStream.
*/
   def count(): DStream[Long] = {
 this.map(_ = (null, 1L))
 .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)),
 1)))
 .reduceByKey(_ + _)
 .map(_._2)
   }

transform is the line throwing the NullPointerException. Can anyone give
some hints as what would cause _ to be null (it is indeed null)? This only
happens when there is no data to process.

When there's data, no NullPointerException is thrown, and all the
processing/counting proceeds correctly.

I am using my custom InputDStream. Could it be that this is the source of
the problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-tp2066p12461.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: NullPointerException from '.count.foreachRDD'

2014-08-20 Thread Shao, Saisai
Hi,

I don't think there's a NPE issue when using DStream/count() even there is no 
data feed into Spark Streaming. I tested using Kafka in my local settings, both 
are OK with and without data consumed.

Actually you can see the details in ReceiverInputDStream, even there is no data 
in this batch duration, it will generate an empty BlockRDD, so map() and 
transformation() in count() operator will never meet NPE. I think the problem 
may lies on your customized InputDStream, you should make sure to generate an 
empty RDD even when there is no data feed in.

Thanks
Jerry

-Original Message-
From: anoldbrain [mailto:anoldbr...@gmail.com] 
Sent: Wednesday, August 20, 2014 4:13 PM
To: u...@spark.incubator.apache.org
Subject: Re: NullPointerException from '.count.foreachRDD'

Looking at the source codes of DStream.scala


   /**
* Return a new DStream in which each RDD has a single element 
 generated by counting each RDD
* of this DStream.
*/
   def count(): DStream[Long] = {
 this.map(_ = (null, 1L))
 .transform(_.union(context.sparkContext.makeRDD(Seq((null, 
 0L)),
 1)))
 .reduceByKey(_ + _)
 .map(_._2)
   }

transform is the line throwing the NullPointerException. Can anyone give some 
hints as what would cause _ to be null (it is indeed null)? This only happens 
when there is no data to process.

When there's data, no NullPointerException is thrown, and all the 
processing/counting proceeds correctly.

I am using my custom InputDStream. Could it be that this is the source of the 
problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-tp2066p12461.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: NullPointerException from '.count.foreachRDD'

2014-08-20 Thread anoldbrain
Thank you for the reply. I implemented my InputDStream to return None when
there's no data. After changing it to return empty RDD, the exception is
gone.

I am curious as to why all other processings worked correctly with my old
incorrect implementation, with or without data? My actual codes, without the
count() part, did glom then foreachRDD.

I have StatsReportListener registered and when there was no data, there was
no StatsReportListener output. And I thought this was Spark's smart logic to
avoid launching workers when there was no data. Wouldn't have thought it was
actually an indication that I had my InputDStream implementation wrong. On
the other hand, why use return type Option if None should not be used at
all?

Thanks for help solving my problem.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-tp2066p12476.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org