What is the error you are getting when you say "??I was trying to write the data to hdfs..but it fails…"
TD On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. < muthu.x.sundaram....@sabre.com> wrote: > I am new to spark. I am trying to do the following. > > NetcatàFlumeàSpark streaming(process Flume Data)àHDFS. > > > > My flume config file has following set up. > > > > Source = netcat > > Sink=avrosink. > > > > Spark Streaming code: > > I am able to print data from flume to the monitor. But I am struggling to > create a file. In order to get the real data I need to convert SparkEvent > to avroEvent. > > JavaRDD.saveAsText()àmight not work..because JavaRDD is collection of > SparkEvent..Do I need to convert this in to collection of > JavaRDD<AvroEvent>? > > Please share any code examples… Thanks. > > > > Code: > > > > Duration batchInterval = new Duration(2000); > > SparkConf sparkConf = new > SparkConf().setAppName("JavaFlumeEventCount"); > > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > batchInterval); > > JavaDStream<SparkFlumeEvent> flumeStream = > FlumeUtils.createStream(ssc, host, port); > > > > flumeStream.count(); > > flumeStream.foreachRDD(new > Function2<JavaRDD<SparkFlumeEvent>,JavaRDD<SparkFlumeEvent>,Void>(){ > > @Override > > public Void call(JavaRDD<SparkFlumeEvent> > events1,JavaRDD<SparkFlumeEvent> events2) throws Exception{ > > events1.saveasTextFile("output.txt"); > > return null; > > } > > }); > > > > /*flumeStream.count().map(new Function<Long, String>() { > > @Override > > public String call(Long in) { > > return "Received " + in + " flume events."; > > } > > }).print();*/ > > > > flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () { > > @Override > > public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws > Exception { > > String logRecord = null; > > List<SparkFlumeEvent> events = eventsData.collect(); > > Iterator<SparkFlumeEvent> batchedEvents = > events.iterator(); > > > > > > long t1 = System.currentTimeMillis(); > > AvroFlumeEvent avroEvent = null; > > ByteBuffer bytePayload = null; > > > > > // All the user level data is carried as payload in > Flume Event > > > > while(batchedEvents.hasNext()) { > > SparkFlumeEvent flumeEvent = > batchedEvents.next(); > > avroEvent = flumeEvent.event(); > > bytePayload = avroEvent.getBody(); > > logRecord = new String(bytePayload.array()); > > > > > System.out.println(">>>>>>>>LOG RECORD = " + > logRecord); > > > > ??I was trying to write the data to hdfs..but > it fails… > > > > > > > > } > > System.out.println("Processed this batch in: " + > (System.currentTimeMillis() - t1)/1000 + " seconds"); > > return null; > > } > > }); > > >