Hi All, I am getting events from flume using following line. JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
Each event is a delimited record. I like to use some of the transformation functions like map and reduce on this. Do I need to convert the JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these function directly on this? I need to do following kind of operations XXXX AA YYYYY Delta TTTTT AA CCCC Southwest XXXX AA Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX. Count is XXXX 2, YYYY 1, TTTTT 1 and so on... AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 ticket. I have to do transformations like this. Right now I am able to receives records. But I am struggling to transform them using spark transformation functions since they are not of type JavaRDD<String>. Can I create new JavaRDD<String>? How do I create new JavaRDD? I loop through the events like below flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () { @Override public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception { String logRecord = null; List<SparkFlumeEvent> events = eventsData.collect(); Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(">>>>>>>>LOG RECORD = " + logRecord); } Where do I create new JavaRDD<String>? DO I do it before this loop? How do I create this JavaRDD<String>? In the loop I am able to get every record and I am able to print them. I appreciate any help here. Thanks, Muthu