Hi, collect(partialFunction) is equivalent to filter(x=> partialFunction.isDefinedAt(x)).map(partialFunction) so it's functionally equivalent to your expression. I favor collect for its more compact form but that's a personal preference. Use what you feel reads best.
Regarding performance, there will be some overhead of submitting many a task for every filtered RDD that gets materialized to Cassandra. That's the reason I proposed the ticket linked above. Have a look whether that would improve your particular usecase and vote for it if so :-) -kr, Gerard. On Sat, Oct 3, 2015 at 3:53 PM, varun sharma <varunsharman...@gmail.com> wrote: > Thanks Gerard....the code snippet you shared worked.. but can you please > explain/point me the usage of *collect* here. How it is > different(performance/readability) from *filter.* > >> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))* > > > I am doing something like this.Please tell if I can improve the *Processing > time* of this particular code: > > kafkaStringStream.foreachRDD{rdd => > val topics = rdd.map(_._1).distinct().collect() > if (topics.length > 0) { > val rdd_value = rdd.take(10).mkString("\n.....\n") > Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all > feeds\n$rdd_value")) > > topics.foreach { topic => > //rdd.filter(x=> x._1 == topic).map(_._2) > val filteredRdd = rdd.collect { case (t, data) if t == topic => data } > CassandraHelper.saveDataToCassandra(topic, filteredRdd) > } > updateOffsetsinZk(rdd) > } > > } > > On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com> > wrote: > >> Something like this? >> >> I'm making the assumption that your topic name equals your keyspace for >> this filtering example. >> >> dstream.foreachRDD{rdd => >> val topics = rdd.map(_._1).distinct.collect >> topics.foreach{topic => >> val filteredRdd = rdd.collect{case (t, data) if t == topic => data}. >> filteredRdd.saveToCassandra(topic, "table") // do not confuse this >> collect with rdd.collect() that brings data to the driver >> } >> } >> >> >> I'm wondering: would something like this ( >> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your >> purposes? >> >> -kr, Gerard. >> >> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com> >> wrote: >> >>> Hi Adrian, >>> >>> Can you please give an example of how to achieve this: >>> >>>> *I would also look at filtering by topic and saving as different >>>> Dstreams in your code* >>> >>> I have managed to get DStream[(String, String)] which is ( >>> *topic,my_data)* tuple. Lets call it kafkaStringStream. >>> Now if I do kafkaStringStream.groupByKey() then I would get a >>> DStream[(String,Iterable[String])]. >>> But I want a DStream instead of Iterable in order to apply >>> saveToCassandra for storing it. >>> >>> Please help in how to transform iterable to DStream or any other >>> workaround for achieving same. >>> >>> >>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote: >>> >>>> On top of that you could make the topic part of the key (e.g. keyBy in >>>> .transform or manually emitting a tuple) and use one of the .xxxByKey >>>> operators for the processing. >>>> >>>> If you have a stable, domain specific list of topics (e.g. 3-5 named >>>> topics) and the processing is *really* different, I would also look at >>>> filtering by topic and saving as different Dstreams in your code. >>>> >>>> Either way you need to start with Cody’s tip in order to extract the >>>> topic name. >>>> >>>> -adrian >>>> >>>> From: Cody Koeninger >>>> Date: Thursday, October 1, 2015 at 5:06 PM >>>> To: Udit Mehta >>>> Cc: user >>>> Subject: Re: Kafka Direct Stream >>>> >>>> You can get the topic for a given partition from the offset range. You >>>> can either filter using that; or just have a single rdd and match on topic >>>> when doing mapPartitions or foreachPartition (which I think is a better >>>> idea) >>>> >>>> >>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers >>>> >>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I am using spark direct stream to consume from multiple topics in >>>>> Kafka. I am able to consume fine but I am stuck at how to separate the >>>>> data >>>>> for each topic since I need to process data differently depending on the >>>>> topic. >>>>> I basically want to split the RDD consisting on N topics into N RDD's >>>>> each having 1 topic. >>>>> >>>>> Any help would be appreciated. >>>>> >>>>> Thanks in advance, >>>>> Udit >>>>> >>>> >>>> >>> >>> >>> -- >>> *VARUN SHARMA* >>> *Flipkart* >>> *Bangalore* >>> >> >> > > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* >