You can try something like this to filter by topic: val kafkaStringStream = KafkaUtils.createDirectStream[.......]
//you might want to create Stream by fetching offsets from zk kafkaStringStream.foreachRDD{rdd => val topics = rdd.map(_._1).distinct().collect() if (topics.length > 0) { val rdd_value = rdd.take(10).mkString("\n.....\n") Logger.log(this.getClass, INFO, BaseSLog(s"Printing all feeds\n$rdd_value")) topics.foreach { topic => val filteredRdd = rdd.collect { case (t, data) if t == topic => data } //do anything with this filteredRdd, like saving to data store } //update the offsets ZookeeperManaager.updateOffsetsinZk(rdd) } } Regards Varun On Thu, Oct 22, 2015 at 2:44 AM, Cody Koeninger <c...@koeninger.org> wrote: > Yeah, that's the general idea. > > Regarding the question in your code comments ... The code inside of > foreachPartition is what's running on the executor. It wouldn't make any > sense to try to get a partition ID before that block. > > On Wed, Oct 21, 2015 at 4:07 PM, Dave Ariens <dari...@blackberry.com> > wrote: > >> Cody, >> >> >> >> First off--thanks for your contributions and blog post, I actually linked >> to in my original question. You'll have to forgive me as I've only been >> using Spark and writing Scala for a few days. I'm aware that the RDD >> partitions are 1:1 with Kafka topic partitions and you can get the offset >> ranges. But my understand is that the below code would need to be executed >> after the stream has been processed. >> >> >> >> Let's say we're storing our filters in a key value map where the key is >> the topic name, and the value is a string that a message within a partition >> of that topic must contain to match. >> >> >> >> Is this the approach you're suggesting (using your example code)? >> >> >> >> // This would get built up on the driver, likely fetching the topic and >> filters from ZK >> >> val topicFilters = Map("topic1" -> "this text must match", "topic2" -> >> "this other text must match") >> >> >> >> >> >> val stream = KafkaUtils.createDirectStream(...) >> >> ... >> >> stream.foreachRDD { rdd => >> >> // Cast the rdd to an interface that lets us get an array of >> OffsetRange >> >> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >> >> >> >> rdd.foreachPartition { iter => >> >> // index to get the correct offset range for the rdd partition >> we're working on >> >> val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId) >> >> >> >> // get any needed data from the offset range >> >> val topic = osr.topic >> >> val kafkaPartitionId = osr.partition >> >> val begin = osr.fromOffset >> >> val end = osr.untilOffset >> >> >> >> // Now we know the topic name, we can filter something >> >> // Or could we have referenced the topic name from >> >> // offsetRanges(TaskContext.get.partitionId) earlier >> >> // before we entered into stream.foreachRDD...? >> >> >> >> >> >> >> >> >> >> >> >> *From:* Cody Koeninger [mailto:c...@koeninger.org] >> *Sent:* Wednesday, October 21, 2015 3:01 PM >> *To:* Dave Ariens >> *Cc:* user@spark.apache.org >> *Subject:* Re: Kafka Streaming and Filtering > 3000 partitons >> >> >> >> The rdd partitions are 1:1 with kafka topicpartitions, so you can use >> offsets ranges to figure out which topic a given rdd partition is for and >> proceed accordingly. See the kafka integration guide in the spark >> streaming docs for more details, or >> https://github.com/koeninger/kafka-exactly-once >> >> >> >> As far as setting offsets in ZK, there's a private interface in the spark >> codebase that would make it a little easier for you to do that. You can >> see that code for reference, or there's an outstanding ticket for making it >> public https://issues.apache.org/jira/browse/SPARK-10963 >> >> >> >> On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens <dari...@blackberry.com> >> wrote: >> >> Hey folks, >> >> >> >> I have a very large number of Kafka topics (many thousands of partitions) >> that I want to consume, filter based on topic-specific filters, then >> produce back to filtered topics in Kafka. >> >> >> >> Using the receiver-less based approach with Spark 1.4.1 (described here >> <https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>) >> I am able to use either KafkaUtils.createDirectStream or >> KafkaUtils.createRDD, consume from many topics, and filter them with the >> same filters but I can't seem to wrap my head around how to apply >> topic-specific filters, or to finally produce to topic-specific destination >> topics. >> >> >> >> Another point would be that I will need to checkpoint the metadata after >> each successful batch and set starting offsets per partition back to ZK. I >> expect I can do that on the final RDDs after casting them accordingly, but >> if anyone has any expertise/guidance doing that and is willing to share, >> I'd be pretty grateful. >> >> >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore*