Hi Can you try this
val lines = FlumeUtils.createStream(ssc,"localhost",9999) // Print out the count of events received from this server in each batch lines.count().map(cnt => "Received " + cnt + " flume events. at " + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com <bit1...@163.com> wrote: > Hi, > I am trying Spark Streaming + Flume example: > > 1. Code > object SparkFlumeNGExample { > def main(args : Array[String]) { > val conf = new SparkConf().setAppName("SparkFlumeNGExample") > val ssc = new StreamingContext(conf, Seconds(10)) > > val lines = FlumeUtils.createStream(ssc,"localhost",9999) > // Print out the count of events received from this server in each > batch > lines.count().map(cnt => "Received " + cnt + " flume events. at " + > System.currentTimeMillis() ).print() > ssc.start() > ssc.awaitTermination(); > } > } > 2. I submit the application with following sh: > ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master > spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 > --class spark.examples.streaming.SparkFlumeNGWordCount > spark-streaming-flume.jar > > > When I write data to flume, I only notice the following console > information that input is added. > storage.BlockManagerInfo: Added input-0-1424151807400 in memory on > localhost:39338 (size: 1095.0 B, free: 267.2 MB) > 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time > 1424151810000 ms > 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time > 1424151820000 ms > .... > 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time > 1424151870000 ms > > But I didn't the output from the code: "Received X flumes events" > > I am no idea where the problem is, any idea? Thanks > > > ------------------------------ > > -- [image: Sigmoid Analytics] <http://htmlsig.com/www.sigmoidanalytics.com> *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com