You can try something like this to filter by topic:

val kafkaStringStream = KafkaUtils.createDirectStream[.......]

//you might want to create Stream by fetching offsets from zk

kafkaStringStream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct().collect()
  if (topics.length > 0) {
    val rdd_value = rdd.take(10).mkString("\n.....\n")
    Logger.log(this.getClass, INFO, BaseSLog(s"Printing all feeds\n$rdd_value"))

    topics.foreach { topic =>
      val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
      //do anything with this filteredRdd, like saving to data store
    }
    //update the offsets
    ZookeeperManaager.updateOffsetsinZk(rdd)
  }
}

Regards

Varun


On Thu, Oct 22, 2015 at 2:44 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Yeah, that's the general idea.
>
> Regarding the question in your code comments ... The code inside of
> foreachPartition is what's running on the executor.  It wouldn't make any
> sense to try to get a partition ID before that block.
>
> On Wed, Oct 21, 2015 at 4:07 PM, Dave Ariens <dari...@blackberry.com>
> wrote:
>
>> Cody,
>>
>>
>>
>> First off--thanks for your contributions and blog post, I actually linked
>> to in my original question. You'll have to forgive me as I've only been
>> using Spark and writing Scala for a few days. I'm aware that the RDD
>> partitions are 1:1 with Kafka topic partitions and you can get the offset
>> ranges.  But my understand is that the below code would need to be executed
>> after the stream has been processed.
>>
>>
>>
>> Let's say we're storing our filters in a key value map where the key is
>> the topic name, and the value is a string that a message within a partition
>> of that topic must contain to match.
>>
>>
>>
>> Is this the approach you're suggesting (using your example code)?
>>
>>
>>
>> // This would get built up on the driver, likely fetching the topic and
>> filters from ZK
>>
>> val topicFilters = Map("topic1" -> "this text must match", "topic2" ->
>> "this other text must match")
>>
>>
>>
>>
>>
>> val stream = KafkaUtils.createDirectStream(...)
>>
>>   ...
>>
>>   stream.foreachRDD { rdd =>
>>
>>     // Cast the rdd to an interface that lets us get an array of
>> OffsetRange
>>
>>     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>>
>>
>>     rdd.foreachPartition { iter =>
>>
>>       // index to get the correct offset range for the rdd partition
>> we're working on
>>
>>       val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)
>>
>>
>>
>>       // get any needed data from the offset range
>>
>>       val topic = osr.topic
>>
>>       val kafkaPartitionId = osr.partition
>>
>>       val begin = osr.fromOffset
>>
>>       val end = osr.untilOffset
>>
>>
>>
>>       // Now we know the topic name, we can filter something
>>
>>       // Or could we have referenced the topic name from
>>
>>       // offsetRanges(TaskContext.get.partitionId) earlier
>>
>>       // before we entered into stream.foreachRDD...?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From:* Cody Koeninger [mailto:c...@koeninger.org]
>> *Sent:* Wednesday, October 21, 2015 3:01 PM
>> *To:* Dave Ariens
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Kafka Streaming and Filtering > 3000 partitons
>>
>>
>>
>> The rdd partitions are 1:1 with kafka topicpartitions, so you can use
>> offsets ranges to figure out which topic a given rdd partition is for and
>> proceed accordingly.  See the kafka integration guide in the spark
>> streaming docs for more details, or
>> https://github.com/koeninger/kafka-exactly-once
>>
>>
>>
>> As far as setting offsets in ZK, there's a private interface in the spark
>> codebase that would make it a little easier for you to do that.  You can
>> see that code for reference, or there's an outstanding ticket for making it
>> public https://issues.apache.org/jira/browse/SPARK-10963
>>
>>
>>
>> On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens <dari...@blackberry.com>
>> wrote:
>>
>> Hey folks,
>>
>>
>>
>> I have a very large number of Kafka topics (many thousands of partitions)
>> that I want to consume, filter based on topic-specific filters, then
>> produce back to filtered topics in Kafka.
>>
>>
>>
>> Using the receiver-less based approach with Spark 1.4.1 (described here
>> <https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>)
>> I am able to use either KafkaUtils.createDirectStream or
>> KafkaUtils.createRDD, consume from many topics, and filter them with the
>> same filters but I can't seem to wrap my head around how to apply
>> topic-specific filters, or to finally produce to topic-specific destination
>> topics.
>>
>>
>>
>> Another point would be that I will need to checkpoint the metadata after
>> each successful batch and set starting offsets per partition back to ZK.  I
>> expect I can do that on the final RDDs after casting them accordingly, but
>> if anyone has any expertise/guidance doing that and is willing to share,
>> I'd be pretty grateful.
>>
>>
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Reply via email to