A guess - parseRecord is returning None in some case (probaly empty lines).
And then entry.get is throwing the exception.

You may want to filter the None values from accessLogDStream before you run
the map function over it.

Hemant

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Which line is line 42 in your code ?
>
> When variable lines becomes empty, you can stop your program.
>
> Cheers
>
> On Feb 23, 2016, at 12:25 AM, Femi Anthony <femib...@gmail.com> wrote:
>
> I am working on Spark Streaming API and I wish to stream a set of
> pre-downloaded web log files continuously to simulate a real-time stream. I
> wrote a script that gunzips the compressed logs and pipes the output to nc
> on port 7777.
>
> The script looks like this:
>
> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
> zipped_files=`find $BASEDIR -name "*.gz"`
>
> for zfile in $zipped_files
>  do
>   echo "Unzipping $zfile..."
>   gunzip -c $zfile  | nc -l -p 7777 -q 20
>
>  done
>
> I have streaming code written in Scala that processes the streams. It
> works well for the most part, but when its run out of files to stream I get
> the following error in Spark:
>
> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
> Restarting receiver with delay 2000 ms: Socket data stream had no more data
> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
> Restarting receiver with delay 2000ms: Socket data stream had no more data
> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
> to only 0 peer(s) instead of 1 peers
> ....
> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>
> How to I implement a graceful shutdown so that the program exits
> gracefully when it no longer detects any data in the stream ?
>
> My Spark Streaming code looks like this:
>
> object StreamingLogEnhanced {
>  def main(args: Array[String]) {
>   val master = args(0)
>   val conf = new
>      SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>  // Create a StreamingContext with a n second batch size
>   val ssc = new StreamingContext(conf, Seconds(10))
>  // Create a DStream from all the input on port 7777
>   val log = Logger.getLogger(getClass.getName)
>
>   sys.ShutdownHookThread {
>   log.info("Gracefully stopping Spark Streaming Application")
>   ssc.stop(true, true)
>   log.info("Application stopped")
>   }
>   val lines = ssc.socketTextStream("localhost", 7777)
>   // Create a count of log hits by ip
>   var ipCounts=countByIp(lines)
>   ipCounts.print()
>
>   // start our streaming context and wait for it to "finish"
>   ssc.start()
>   // Wait for 600 seconds then exit
>   ssc.awaitTermination(10000*600)
>   ssc.stop()
>   }
>
>  def countByIp(lines: DStream[String]) = {
>    val parser = new AccessLogParser
>    val accessLogDStream = lines.map(line => parser.parseRecord(line))
>    val ipDStream = accessLogDStream.map(entry =>
>                     (entry.get.clientIpAddress, 1))
>    ipDStream.reduceByKey((x, y) => x + y)
>  }
>
> }
>
> Thanks for any suggestions in advance.
>
>

Reply via email to