I tried to map SparkFlumeEvents to String of RDDs like below. But that map and call are not at all executed. I might be doing this in a wrong way. Any help would be appreciated.
flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () { @Override public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception { System.out.println("<<<<<<Inside for each...call>>>>"); JavaRDD<String> records = eventsData.map( new Function<SparkFlumeEvent, String>() { @Override public String call(SparkFlumeEvent flume) throws Exception { String logRecord = null; AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; System.out.println("<<<<<<Inside Map..call>>>>"); /* List<SparkFlumeEvent> events = flume.collect(); Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); SparkFlumeEvent flumeEvent = batchedEvents.next();*/ avroEvent = flume.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println("<<<<Record is" + logRecord); return logRecord; } }); return null; } -----Original Message----- From: Sundaram, Muthu X. [mailto:muthu.x.sundaram....@sabre.com] Sent: Tuesday, July 22, 2014 10:24 AM To: user@spark.apache.org; d...@spark.incubator.apache.org Subject: Tranforming flume events using Spark transformation functions Hi All, I am getting events from flume using following line. JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); Each event is a delimited record. I like to use some of the transformation functions like map and reduce on this. Do I need to convert the JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these function directly on this? I need to do following kind of operations XXXX AA YYYYY Delta TTTTT AA CCCC Southwest XXXX AA Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX. Count is XXXX 2, YYYY 1, TTTTT 1 and so on... AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 ticket. I have to do transformations like this. Right now I am able to receives records. But I am struggling to transform them using spark transformation functions since they are not of type JavaRDD<String>. Can I create new JavaRDD<String>? How do I create new JavaRDD? I loop through the events like below flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () { @Override public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception { String logRecord = null; List<SparkFlumeEvent> events = eventsData.collect(); Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(">>>>>>>>LOG RECORD = " + logRecord); } Where do I create new JavaRDD<String>? DO I do it before this loop? How do I create this JavaRDD<String>? In the loop I am able to get every record and I am able to print them. I appreciate any help here. Thanks, Muthu