What is the error you are getting when you say "??I was trying to write the
data to hdfs..but it fails…"

TD


On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. <
muthu.x.sundaram....@sabre.com> wrote:

> I am new to spark. I am trying to do the following.
>
> NetcatàFlumeàSpark streaming(process Flume Data)àHDFS.
>
>
>
> My flume config file has following set up.
>
>
>
> Source = netcat
>
> Sink=avrosink.
>
>
>
> Spark Streaming code:
>
> I am able to print data from flume to the monitor. But I am struggling to
> create a file. In order to get the real data I need to convert SparkEvent
> to avroEvent.
>
> JavaRDD.saveAsText()àmight not work..because JavaRDD is collection of
> SparkEvent..Do I need to convert this in to collection of
> JavaRDD<AvroEvent>?
>
> Please share any code examples… Thanks.
>
>
>
> Code:
>
>
>
>  Duration batchInterval = new Duration(2000);
>
>     SparkConf sparkConf = new
> SparkConf().setAppName("JavaFlumeEventCount");
>
>     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
>     JavaDStream<SparkFlumeEvent> flumeStream =
> FlumeUtils.createStream(ssc, host, port);
>
>
>
>     flumeStream.count();
>
> flumeStream.foreachRDD(new
> Function2<JavaRDD<SparkFlumeEvent>,JavaRDD<SparkFlumeEvent>,Void>(){
>
>  @Override
>
>  public Void call(JavaRDD<SparkFlumeEvent>
> events1,JavaRDD<SparkFlumeEvent> events2) throws Exception{
>
> events1.saveasTextFile("output.txt");
>
> return null;
>
>  }
>
>  });
>
>
>
>     /*flumeStream.count().map(new Function<Long, String>() {
>
>       @Override
>
>       public String call(Long in) {
>
>         return "Received " + in + " flume events.";
>
>       }
>
>     }).print();*/
>
>
>
>     flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
>
>               @Override
>
>               public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
> Exception {
>
>                      String logRecord = null;
>
>                      List<SparkFlumeEvent> events = eventsData.collect();
>
>                      Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>
>
>
>
>
>                      long t1 = System.currentTimeMillis();
>
>                      AvroFlumeEvent avroEvent = null;
>
>                      ByteBuffer bytePayload = null;
>
>
>
>
>                      // All the user level data is carried as payload in
> Flume Event
>
>
>
>                      while(batchedEvents.hasNext()) {
>
>                             SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>
>                             avroEvent = flumeEvent.event();
>
>                             bytePayload = avroEvent.getBody();
>
>                             logRecord = new String(bytePayload.array());
>
>
>
>
>                             System.out.println(">>>>>>>>LOG RECORD = " +
> logRecord);
>
>
>
>                            ??I was trying to write the data to hdfs..but
> it fails…
>
>
>
>
>
>
>
>                      }
>
>                      System.out.println("Processed this batch in: " +
> (System.currentTimeMillis() - t1)/1000 + " seconds");
>
>                      return null;
>
>               }
>
>          });
>
>
>

Reply via email to